You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/04/14 04:12:05 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13] M4 function & Equal Size Bucket Sample functions (#9538)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 6728106468 [To rel/0.13] M4 function & Equal Size Bucket Sample functions (#9538)
6728106468 is described below

commit 6728106468d243a8be2f2a131a68942b4df3f71b
Author: Rui,Lei <33...@users.noreply.github.com>
AuthorDate: Fri Apr 14 12:11:59 2023 +0800

    [To rel/0.13] M4 function & Equal Size Bucket Sample functions (#9538)
    
    Currently M4 is implemented only in v1.0 and later versions. However, according to user needs, M4 is also required in v0.13. So this pr aims to add M4 function and also Equal Size Bucket Sample functions (EqualSizeBucketM4Sample is one of them) into rel/0.13. No new features but adapt the related codes, tests, documents, site references from master into rel/0.13.
    
    Changes:
    1. remove old M4 documents and site references
        - remove old M4 documents M4.md (bilingual)
        - remove references of old M4 documents in the site configuration file config.js
    
    2. add new codes, tests, documents, site references for M4 function (keep the contents same as those in the master branch 44f046c)
        - M4 source code UDTFM4.java, BuiltinFunction.java
        - M4 integration tests added in IoTDBUDTFBuiltinFunctionIT.java
        - M4 documents added in Select-Expression.md (bilingual)
        - the site configuration file config.js already references the existing Select-Expression.md, so no need to add
    
    3. add new codes, tests, documents, site references for Equal Size Bucket Sample functions (keep the contents same as those in the master branch 44f046c)
        - Equal Size Bucket Sample source code UDTFEqualSizeBucketSample.java, UDTFEqualSizeBucketRandomSample.java, UDTFEqualSizeBucketAggSample.java, UDTFEqualSizeBucketM4Sample.java, UDTFEqualSizeBucketOutlierSample.java, BuiltinFunction.java
        - Equal Size Bucket Sample integration tests added in IoTDBUDTFBuiltinFunctionIT.java
        - Equal Size Bucket Sample documents added in Select-Expression.md (bilingual)
        - the site configuration file config.js already references the existing Select-Expression.md, so no need to add
---
 client-cpp/pom.xml                                 |   4 +-
 compile-tools/pom.xml                              |   6 +-
 distribution/pom.xml                               |   2 +-
 docs/UserGuide/Query-Data/Select-Expression.md     | 421 ++++++++++-
 docs/UserGuide/UDF-Library/M4.md                   |  92 ---
 docs/zh/UserGuide/Query-Data/Select-Expression.md  | 470 +++++++++++-
 docs/zh/UserGuide/UDF-Library/M4.md                |  93 ---
 example/client-cpp-example/pom.xml                 |   2 +-
 example/trigger/pom.xml                            |   2 +-
 example/udf/pom.xml                                |   2 +-
 grafana-connector/pom.xml                          |   2 +-
 grafana-plugin/pom.xml                             |   8 +-
 integration/pom.xml                                |   6 +-
 .../db/integration/IoTDBUDTFBuiltinFunctionIT.java | 517 +++++++++++++
 jdbc/pom.xml                                       |   2 +-
 pom.xml                                            |   8 +-
 .../db/query/udf/builtin/BuiltinFunction.java      |   8 +-
 .../udf/builtin/UDTFEqualSizeBucketAggSample.java  | 493 ++++++++++++
 .../udf/builtin/UDTFEqualSizeBucketM4Sample.java   | 237 ++++++
 .../builtin/UDTFEqualSizeBucketOutlierSample.java  | 834 +++++++++++++++++++++
 .../builtin/UDTFEqualSizeBucketRandomSample.java   |  69 ++
 .../udf/builtin/UDTFEqualSizeBucketSample.java     |  48 ++
 .../apache/iotdb/db/query/udf/builtin/UDTFM4.java  | 312 ++++++++
 site/src/main/.vuepress/config.js                  |  12 +-
 24 files changed, 3392 insertions(+), 258 deletions(-)

diff --git a/client-cpp/pom.xml b/client-cpp/pom.xml
index 0cca39c3af..a3bf0769b9 100644
--- a/client-cpp/pom.xml
+++ b/client-cpp/pom.xml
@@ -116,8 +116,8 @@
                 <cmake.root.dir>${project.parent.basedir}/compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
                 <thrift.exec.absolute.path>${project.parent.basedir}/compile-tools/thrift/target/build/compiler/cpp/bin/${cmake.build.type}/thrift.exe</thrift.exec.absolute.path>
                 <iotdb.server.script>start-server.bat</iotdb.server.script>
-                <boost.include.dir />
-                <boost.library.dir />
+                <boost.include.dir/>
+                <boost.library.dir/>
             </properties>
         </profile>
         <profile>
diff --git a/compile-tools/pom.xml b/compile-tools/pom.xml
index 03328fcaa2..b3993eea2b 100644
--- a/compile-tools/pom.xml
+++ b/compile-tools/pom.xml
@@ -35,7 +35,7 @@
         <cmake-version>3.17.3</cmake-version>
         <openssl.include.dir>-Dtrue1=true1</openssl.include.dir>
         <bison.executable.dir>-Dtrue1=true1</bison.executable.dir>
-        <cmake.build.type />
+        <cmake.build.type/>
     </properties>
     <modules>
         <module>thrift</module>
@@ -138,8 +138,8 @@
                 <thrift.make.executable>make</thrift.make.executable>
                 <thrift.compiler.executable>thrift.exe</thrift.compiler.executable>
                 <gradlew.executable>gradlew.bat</gradlew.executable>
-                <boost.include.dir />
-                <boost.library.dir />
+                <boost.include.dir/>
+                <boost.library.dir/>
             </properties>
         </profile>
     </profiles>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index cd76a024fd..e5da644429 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -29,7 +29,7 @@
     </parent>
     <artifactId>iotdb-distribution</artifactId>
     <name>IoTDB Distribution</name>
-    <modules />
+    <modules/>
     <build>
         <plugins>
             <plugin>
diff --git a/docs/UserGuide/Query-Data/Select-Expression.md b/docs/UserGuide/Query-Data/Select-Expression.md
index 29cecd1f35..960774a8d2 100644
--- a/docs/UserGuide/Query-Data/Select-Expression.md
+++ b/docs/UserGuide/Query-Data/Select-Expression.md
@@ -373,7 +373,7 @@ It costs 0.009s
 
 
 
-#### Syntax
+#### Example
 Example data:
 ```
 IoTDB> select text from root.test;
@@ -429,7 +429,7 @@ IoTDB> select ts from root.test;
 +-----------------------------+------------+
 ```
 
-##### Test 1
+#### Example 1
 SQL:
 ```sql
 select ts, on_off(ts, 'threshold'='2') from root.test;
@@ -448,7 +448,7 @@ IoTDB> select ts, on_off(ts, 'threshold'='2') from root.test;
 +-----------------------------+------------+-------------------------------------+
 ```
 
-##### Test 2
+#### Example 2
 Sql:
 ```sql
 select ts, in_range(ts, 'lower'='2', 'upper'='3.1') from root.test;
@@ -480,7 +480,7 @@ They can be divided into two categories according to return value:
 | ZERO_COUNT        | INT32/ INT64/ FLOAT/ DOUBLE/ BOOLEAN | `min`:Optional with default value `1L`<br>`max`:Optional with default value `Long.MAX_VALUE` | Long              | Return intervals' start times and the number of data points in the interval in which the value is always 0(false). Data points number `n` satisfy `n >= min && n <= max`     |
 | NON_ZERO_COUNT    | INT32/ INT64/ FLOAT/ DOUBLE/ BOOLEAN | `min`:Optional with default value `1L`<br>`max`:Optional with default value `Long.MAX_VALUE` | Long              | Return intervals' start times and the number of data points in the interval in which the value is always not 0(false). Data points number `n` satisfy `n >= min && n <= max` |
 
-##### Demonstrate
+#### Example
 Example data:
 ```
 IoTDB> select s1,s2,s3,s4,s5 from root.sg.d2;
@@ -519,6 +519,407 @@ Result:
 +-----------------------------+-------------+-------------------------+-----------------------------+----------------------------+--------------------------------+
 ```
 
+### Equal Size Bucket Sample Function
+
+This function samples the input sequence in equal size buckets. That is, according to the downsampling ratio and downsampling method given by the user, the input sequence is equally divided into several buckets according to a fixed number of points, and sampled by the given sampling method within each bucket.
+
+- `proportion`: sample ratio, the value range is `(0, 1]`.
+- four sampling methods: 
+    - `equal_size_bucket_random_sample`
+    - `equal_size_bucket_agg_sample`
+    - `equal_size_bucket_m4_sample`
+    - `equal_size_bucket_outlier_sample`
+
+#### Equal Size Bucket Random Sample
+
+Random sampling is performed on the equally divided buckets.
+
+| Function Name                   | Allowed Input Series Data Types | Required Attributes                                          | Output Series Data Type        | Series Data Type  Description                                |
+| ------------------------------- | ------------------------------- | ------------------------------------------------------------ | ------------------------------ | ------------------------------------------------------------ |
+| EQUAL_SIZE_BUCKET_RANDOM_SAMPLE | INT32 / INT64 / FLOAT / DOUBLE  | `proportion` The value range is `(0, 1]`, the default is `0.1` | INT32 / INT64 / FLOAT / DOUBLE | Returns a random sample of equal buckets that matches the sampling ratio |
+
+##### Example
+
+Example data: `root.ln.wf01.wt01.temperature` has a total of `100` ordered data from `0.0-99.0`.
+
+```sql
+IoTDB> select temperature from root.ln.wf01.wt01;
++-----------------------------+-----------------------------+
+|                         Time|root.ln.wf01.wt01.temperature|
++-----------------------------+-----------------------------+
+|1970-01-01T08:00:00.000+08:00|                          0.0|
+|1970-01-01T08:00:00.001+08:00|                          1.0|
+|1970-01-01T08:00:00.002+08:00|                          2.0|
+|1970-01-01T08:00:00.003+08:00|                          3.0|
+|1970-01-01T08:00:00.004+08:00|                          4.0|
+|1970-01-01T08:00:00.005+08:00|                          5.0|
+|1970-01-01T08:00:00.006+08:00|                          6.0|
+|1970-01-01T08:00:00.007+08:00|                          7.0|
+|1970-01-01T08:00:00.008+08:00|                          8.0|
+|1970-01-01T08:00:00.009+08:00|                          9.0|
+|1970-01-01T08:00:00.010+08:00|                         10.0|
+|1970-01-01T08:00:00.011+08:00|                         11.0|
+|1970-01-01T08:00:00.012+08:00|                         12.0|
+|.............................|.............................|            
+|1970-01-01T08:00:00.089+08:00|                         89.0|
+|1970-01-01T08:00:00.090+08:00|                         90.0|
+|1970-01-01T08:00:00.091+08:00|                         91.0|
+|1970-01-01T08:00:00.092+08:00|                         92.0|
+|1970-01-01T08:00:00.093+08:00|                         93.0|
+|1970-01-01T08:00:00.094+08:00|                         94.0|
+|1970-01-01T08:00:00.095+08:00|                         95.0|
+|1970-01-01T08:00:00.096+08:00|                         96.0|
+|1970-01-01T08:00:00.097+08:00|                         97.0|
+|1970-01-01T08:00:00.098+08:00|                         98.0|
+|1970-01-01T08:00:00.099+08:00|                         99.0|
++-----------------------------+-----------------------------+
+```
+
+Sql:
+
+```sql
+select equal_size_bucket_random_sample(temperature,'proportion'='0.1') as random_sample from root.ln.wf01.wt01;
+```
+
+Result:
+
+```sql
++-----------------------------+-------------+
+|                         Time|random_sample|
++-----------------------------+-------------+
+|1970-01-01T08:00:00.007+08:00|          7.0|
+|1970-01-01T08:00:00.014+08:00|         14.0|
+|1970-01-01T08:00:00.020+08:00|         20.0|
+|1970-01-01T08:00:00.035+08:00|         35.0|
+|1970-01-01T08:00:00.047+08:00|         47.0|
+|1970-01-01T08:00:00.059+08:00|         59.0|
+|1970-01-01T08:00:00.063+08:00|         63.0|
+|1970-01-01T08:00:00.079+08:00|         79.0|
+|1970-01-01T08:00:00.086+08:00|         86.0|
+|1970-01-01T08:00:00.096+08:00|         96.0|
++-----------------------------+-------------+
+Total line number = 10
+It costs 0.024s
+```
+
+#### Equal Size Bucket Aggregation Sample
+
+The input sequence is sampled by the aggregation sampling method, and the user needs to provide an additional aggregation function parameter, namely
+
+- `type`: Aggregate type, which can be `avg` or `max` or `min` or `sum` or `extreme` or `variance`. By default, `avg` is used. `extreme` represents the value with the largest absolute value in the equal bucket. `variance` represents the variance in the sampling equal buckets.
+
+The timestamp of the sampling output of each bucket is the timestamp of the first point of the bucket.
+
+| Function Name                | Allowed Input Series Data Types | Required Attributes                                          | Output Series Data Type        | Series Data Type  Description                                |
+| ---------------------------- | ------------------------------- | ------------------------------------------------------------ | ------------------------------ | ------------------------------------------------------------ |
+| EQUAL_SIZE_BUCKET_AGG_SAMPLE | INT32 / INT64 / FLOAT / DOUBLE  | `proportion` The value range is `(0, 1]`, the default is `0.1`<br>`type`: The value types are `avg`, `max`, `min`, `sum`, `extreme`, `variance`, the default is `avg` | INT32 / INT64 / FLOAT / DOUBLE | Returns equal bucket aggregation samples that match the sampling ratio |
+
+##### Example
+
+Example data: `root.ln.wf01.wt01.temperature` has a total of `100` ordered data from `0.0-99.0`, and the test data is randomly sampled in equal buckets.
+
+Sql:
+
+```sql
+select equal_size_bucket_agg_sample(temperature, 'type'='avg','proportion'='0.1') as agg_avg, equal_size_bucket_agg_sample(temperature, 'type'='max','proportion'='0.1') as agg_max, equal_size_bucket_agg_sample(temperature,'type'='min','proportion'='0.1') as agg_min, equal_size_bucket_agg_sample(temperature, 'type'='sum','proportion'='0.1') as agg_sum, equal_size_bucket_agg_sample(temperature, 'type'='extreme','proportion'='0.1') as agg_extreme, equal_size_bucket_agg_sample(temperature, ' [...]
+```
+
+Result:
+
+```sql
++-----------------------------+-----------------+-------+-------+-------+-----------+------------+
+|                         Time|          agg_avg|agg_max|agg_min|agg_sum|agg_extreme|agg_variance|
++-----------------------------+-----------------+-------+-------+-------+-----------+------------+
+|1970-01-01T08:00:00.000+08:00|              4.5|    9.0|    0.0|   45.0|        9.0|        8.25|
+|1970-01-01T08:00:00.010+08:00|             14.5|   19.0|   10.0|  145.0|       19.0|        8.25|
+|1970-01-01T08:00:00.020+08:00|             24.5|   29.0|   20.0|  245.0|       29.0|        8.25|
+|1970-01-01T08:00:00.030+08:00|             34.5|   39.0|   30.0|  345.0|       39.0|        8.25|
+|1970-01-01T08:00:00.040+08:00|             44.5|   49.0|   40.0|  445.0|       49.0|        8.25|
+|1970-01-01T08:00:00.050+08:00|             54.5|   59.0|   50.0|  545.0|       59.0|        8.25|
+|1970-01-01T08:00:00.060+08:00|             64.5|   69.0|   60.0|  645.0|       69.0|        8.25|
+|1970-01-01T08:00:00.070+08:00|74.50000000000001|   79.0|   70.0|  745.0|       79.0|        8.25|
+|1970-01-01T08:00:00.080+08:00|             84.5|   89.0|   80.0|  845.0|       89.0|        8.25|
+|1970-01-01T08:00:00.090+08:00|             94.5|   99.0|   90.0|  945.0|       99.0|        8.25|
++-----------------------------+-----------------+-------+-------+-------+-----------+------------+
+Total line number = 10
+It costs 0.044s
+```
+
+#### Equal Size Bucket M4 Sample
+
+The input sequence is sampled using the M4 sampling method. That is to sample the head, tail, min and max values for each bucket.
+
+| Function Name               | Allowed Input Series Data Types | Required Attributes                                          | Output Series Data Type        | Series Data Type  Description                                |
+| --------------------------- | ------------------------------- | ------------------------------------------------------------ | ------------------------------ | ------------------------------------------------------------ |
+| EQUAL_SIZE_BUCKET_M4_SAMPLE | INT32 / INT64 / FLOAT / DOUBLE  | `proportion` The value range is `(0, 1]`, the default is `0.1` | INT32 / INT64 / FLOAT / DOUBLE | Returns equal bucket M4 samples that match the sampling ratio |
+
+##### Example
+
+Example data: `root.ln.wf01.wt01.temperature` has a total of `100` ordered data from `0.0-99.0`, and the test data is randomly sampled in equal buckets.
+
+Sql:
+
+```sql
+select equal_size_bucket_m4_sample(temperature, 'proportion'='0.1') as M4_sample from root.ln.wf01.wt01;
+```
+
+Result:
+
+```
++-----------------------------+---------+
+|                         Time|M4_sample|
++-----------------------------+---------+
+|1970-01-01T08:00:00.000+08:00|      0.0|
+|1970-01-01T08:00:00.001+08:00|      1.0|
+|1970-01-01T08:00:00.038+08:00|     38.0|
+|1970-01-01T08:00:00.039+08:00|     39.0|
+|1970-01-01T08:00:00.040+08:00|     40.0|
+|1970-01-01T08:00:00.041+08:00|     41.0|
+|1970-01-01T08:00:00.078+08:00|     78.0|
+|1970-01-01T08:00:00.079+08:00|     79.0|
+|1970-01-01T08:00:00.080+08:00|     80.0|
+|1970-01-01T08:00:00.081+08:00|     81.0|
+|1970-01-01T08:00:00.098+08:00|     98.0|
+|1970-01-01T08:00:00.099+08:00|     99.0|
++-----------------------------+---------+
+Total line number = 12
+It costs 0.065s
+```
+
+#### Equal Size Bucket Outlier Sample
+
+This function samples the input sequence with equal number of bucket outliers, that is, according to the downsampling ratio given by the user and the number of samples in the bucket, the input sequence is divided into several buckets according to a fixed number of points. Sampling by the given outlier sampling method within each bucket.
+
+| Function Name                    | Allowed Input Series Data Types | Required Attributes                                          | Output Series Data Type        | Series Data Type  Description                                |
+| -------------------------------- | ------------------------------- | ------------------------------------------------------------ | ------------------------------ | ------------------------------------------------------------ |
+| EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE | INT32 / INT64 / FLOAT / DOUBLE  | The value range of `proportion` is `(0, 1]`, the default is `0.1`<br> The value of `type` is `avg` or `stendis` or `cos` or `prenextdis`, the default is `avg` <br>The value of `number` should be greater than 0, the default is `3` | INT32 / INT64 / FLOAT / DOUBLE | Returns outlier samples in equal buckets that match the sampling ratio and the number of samples in the bucket |
+
+Parameter Description
+
+- `proportion`: sampling ratio
+- `number`: the number of samples in each bucket, default `3`
+- `type`: outlier sampling method, the value is
+    - `avg`: Take the average of the data points in the bucket, and find the `top number` farthest from the average according to the sampling ratio
+    - `stendis`: Take the vertical distance between each data point in the bucket and the first and last data points of the bucket to form a straight line, and according to the sampling ratio, find the `top number` with the largest distance
+    - `cos`: Set a data point in the bucket as b, the data point on the left of b as a, and the data point on the right of b as c, then take the cosine value of the angle between the ab and bc vectors. The larger the angle, the more likely it is an outlier. Find the `top number` with the smallest cos value
+    - `prenextdis`: Let a data point in the bucket be b, the data point to the left of b is a, and the data point to the right of b is c, then take the sum of the lengths of ab and bc as the yardstick, the larger the sum, the more likely it is to be an outlier, and find the `top number` with the largest sum value
+
+##### Example
+
+Example data: `root.ln.wf01.wt01.temperature` has a total of `100` ordered data from `0.0-99.0`. Among them, in order to add outliers, we make the number modulo 5 equal to 0 increment by 100.
+
+```sql
+IoTDB> select temperature from root.ln.wf01.wt01;
++-----------------------------+-----------------------------+
+|                         Time|root.ln.wf01.wt01.temperature|
++-----------------------------+-----------------------------+
+|1970-01-01T08:00:00.000+08:00|                          0.0|
+|1970-01-01T08:00:00.001+08:00|                          1.0|
+|1970-01-01T08:00:00.002+08:00|                          2.0|
+|1970-01-01T08:00:00.003+08:00|                          3.0|
+|1970-01-01T08:00:00.004+08:00|                          4.0|
+|1970-01-01T08:00:00.005+08:00|                        105.0|
+|1970-01-01T08:00:00.006+08:00|                          6.0|
+|1970-01-01T08:00:00.007+08:00|                          7.0|
+|1970-01-01T08:00:00.008+08:00|                          8.0|
+|1970-01-01T08:00:00.009+08:00|                          9.0|
+|1970-01-01T08:00:00.010+08:00|                         10.0|
+|1970-01-01T08:00:00.011+08:00|                         11.0|
+|1970-01-01T08:00:00.012+08:00|                         12.0|
+|1970-01-01T08:00:00.013+08:00|                         13.0|
+|1970-01-01T08:00:00.014+08:00|                         14.0|
+|1970-01-01T08:00:00.015+08:00|                        115.0|
+|1970-01-01T08:00:00.016+08:00|                         16.0|
+|.............................|.............................|
+|1970-01-01T08:00:00.092+08:00|                         92.0|
+|1970-01-01T08:00:00.093+08:00|                         93.0|
+|1970-01-01T08:00:00.094+08:00|                         94.0|
+|1970-01-01T08:00:00.095+08:00|                        195.0|
+|1970-01-01T08:00:00.096+08:00|                         96.0|
+|1970-01-01T08:00:00.097+08:00|                         97.0|
+|1970-01-01T08:00:00.098+08:00|                         98.0|
+|1970-01-01T08:00:00.099+08:00|                         99.0|
++-----------------------------+-----------------------------+
+```
+
+Sql:
+
+```sql
+select equal_size_bucket_outlier_sample(temperature, 'proportion'='0.1', 'type'='avg', 'number'='2') as outlier_avg_sample, equal_size_bucket_outlier_sample(temperature, 'proportion'='0.1', 'type'='stendis', 'number'='2') as outlier_stendis_sample, equal_size_bucket_outlier_sample(temperature, 'proportion'='0.1', 'type'='cos', 'number'='2') as outlier_cos_sample, equal_size_bucket_outlier_sample(temperature, 'proportion'='0.1', 'type'='prenextdis', 'number'='2') as outlier_prenextdis_sam [...]
+```
+
+Result:
+
+```sql
++-----------------------------+------------------+----------------------+------------------+-------------------------+
+|                         Time|outlier_avg_sample|outlier_stendis_sample|outlier_cos_sample|outlier_prenextdis_sample|
++-----------------------------+------------------+----------------------+------------------+-------------------------+
+|1970-01-01T08:00:00.005+08:00|             105.0|                 105.0|             105.0|                    105.0|
+|1970-01-01T08:00:00.015+08:00|             115.0|                 115.0|             115.0|                    115.0|
+|1970-01-01T08:00:00.025+08:00|             125.0|                 125.0|             125.0|                    125.0|
+|1970-01-01T08:00:00.035+08:00|             135.0|                 135.0|             135.0|                    135.0|
+|1970-01-01T08:00:00.045+08:00|             145.0|                 145.0|             145.0|                    145.0|
+|1970-01-01T08:00:00.055+08:00|             155.0|                 155.0|             155.0|                    155.0|
+|1970-01-01T08:00:00.065+08:00|             165.0|                 165.0|             165.0|                    165.0|
+|1970-01-01T08:00:00.075+08:00|             175.0|                 175.0|             175.0|                    175.0|
+|1970-01-01T08:00:00.085+08:00|             185.0|                 185.0|             185.0|                    185.0|
+|1970-01-01T08:00:00.095+08:00|             195.0|                 195.0|             195.0|                    195.0|
++-----------------------------+------------------+----------------------+------------------+-------------------------+
+Total line number = 10
+It costs 0.041s
+```
+
+### M4 Function
+
+M4 is used to sample the `first, last, bottom, top` points for each sliding window:
+
+-   the first point is the point with the **m**inimal time;
+-   the last point is the point with the **m**aximal time;
+-   the bottom point is the point with the **m**inimal value (if there are multiple such points, M4 returns one of them);
+-   the top point is the point with the **m**aximal value (if there are multiple such points, M4 returns one of them).
+
+<img src="https://alioss.timecho.com/docs/img/github/198178733-a0919d17-0663-4672-9c4f-1efad6f463c2.png" alt="image" style="zoom:50%;" />
+
+| Function Name | Allowed Input Series Data Types | Attributes                                                   | Output Series Data Type        | Series Data Type  Description                                |
+| ------------- | ------------------------------- | ------------------------------------------------------------ | ------------------------------ | ------------------------------------------------------------ |
+| M4            | INT32 / INT64 / FLOAT / DOUBLE  | Different attributes used by the size window and the time window. The size window uses attributes `windowSize` and `slidingStep`. The time window uses attributes `timeInterval`, `slidingStep`, `displayWindowBegin`, and `displayWindowEnd`. More details see below. | INT32 / INT64 / FLOAT / DOUBLE | Returns the `first, last, bottom, top` points in each sliding window. M4 sorts and deduplicates the aggregated points within the window before [...]
+
+#### Attributes
+
+**(1) Attributes for the size window:**
+
++ `windowSize`: The number of points in a window. Int data type. **Required**.
++ `slidingStep`: Slide a window by the number of points. Int data type. Optional. If not set, default to the same as `windowSize`.
+
+<img src="https://alioss.timecho.com/docs/img/github/198181449-00d563c8-7bce-4ecd-a031-ec120ca42c3f.png" alt="image" style="zoom: 50%;" />
+
+**(2) Attributes for the time window:**
+
++ `timeInterval`: The time interval length of a window. Long data type. **Required**.
++ `slidingStep`: Slide a window by the time length. Long data type. Optional. If not set, default to the same as `timeInterval`.
++ `displayWindowBegin`: The starting position of the window (included). Long data type. Optional. If not set, default to Long.MIN_VALUE, meaning using the time of the first data point of the input time series as the starting position of the window.
++ `displayWindowEnd`: End time limit (excluded, essentially playing the same role as `WHERE time < displayWindowEnd`). Long data type. Optional. If not set, default to Long.MAX_VALUE, meaning there is no additional end time limit other than the end of the input time series itself.
+
+<img src="https://alioss.timecho.com/docs/img/github/198183015-93b56644-3330-4acf-ae9e-d718a02b5f4c.png" alt="groupBy window" style="zoom: 67%;" />
+
+#### Examples
+
+Input series:
+
+```sql
++-----------------------------+------------------+
+|                         Time|root.vehicle.d1.s1|
++-----------------------------+------------------+
+|1970-01-01T08:00:00.001+08:00|               5.0|
+|1970-01-01T08:00:00.002+08:00|              15.0|
+|1970-01-01T08:00:00.005+08:00|              10.0|
+|1970-01-01T08:00:00.008+08:00|               8.0|
+|1970-01-01T08:00:00.010+08:00|              30.0|
+|1970-01-01T08:00:00.020+08:00|              20.0|
+|1970-01-01T08:00:00.025+08:00|               8.0|
+|1970-01-01T08:00:00.027+08:00|              20.0|
+|1970-01-01T08:00:00.030+08:00|              40.0|
+|1970-01-01T08:00:00.033+08:00|               9.0|
+|1970-01-01T08:00:00.035+08:00|              10.0|
+|1970-01-01T08:00:00.040+08:00|              20.0|
+|1970-01-01T08:00:00.045+08:00|              30.0|
+|1970-01-01T08:00:00.052+08:00|               8.0|
+|1970-01-01T08:00:00.054+08:00|              18.0|
++-----------------------------+------------------+
+```
+
+SQL for query1:
+
+```sql
+select M4(s1,'timeInterval'='25','displayWindowBegin'='0','displayWindowEnd'='100') from root.vehicle.d1
+```
+
+Output1:
+
+```sql
++-----------------------------+-----------------------------------------------------------------------------------------------+
+|                         Time|M4(root.vehicle.d1.s1, "timeInterval"="25", "displayWindowBegin"="0", "displayWindowEnd"="100")|
++-----------------------------+-----------------------------------------------------------------------------------------------+
+|1970-01-01T08:00:00.001+08:00|                                                                                            5.0|
+|1970-01-01T08:00:00.010+08:00|                                                                                           30.0|
+|1970-01-01T08:00:00.020+08:00|                                                                                           20.0|
+|1970-01-01T08:00:00.025+08:00|                                                                                            8.0|
+|1970-01-01T08:00:00.030+08:00|                                                                                           40.0|
+|1970-01-01T08:00:00.045+08:00|                                                                                           30.0|
+|1970-01-01T08:00:00.052+08:00|                                                                                            8.0|
+|1970-01-01T08:00:00.054+08:00|                                                                                           18.0|
++-----------------------------+-----------------------------------------------------------------------------------------------+
+Total line number = 8
+```
+
+SQL for query2:
+
+```sql
+select M4(s1,'windowSize'='10') from root.vehicle.d1
+```
+
+Output2:
+
+```sql
++-----------------------------+-----------------------------------------+
+|                         Time|M4(root.vehicle.d1.s1, "windowSize"="10")|
++-----------------------------+-----------------------------------------+
+|1970-01-01T08:00:00.001+08:00|                                      5.0|
+|1970-01-01T08:00:00.030+08:00|                                     40.0|
+|1970-01-01T08:00:00.033+08:00|                                      9.0|
+|1970-01-01T08:00:00.035+08:00|                                     10.0|
+|1970-01-01T08:00:00.045+08:00|                                     30.0|
+|1970-01-01T08:00:00.052+08:00|                                      8.0|
+|1970-01-01T08:00:00.054+08:00|                                     18.0|
++-----------------------------+-----------------------------------------+
+Total line number = 7
+```
+
+#### Suggested Use Cases
+
+**(1) Use Case: Extreme-point-preserving downsampling**
+
+As M4 aggregation selects the `first, last, bottom, top` points for each window, M4 usually preserves extreme points and thus patterns better than other downsampling methods such as Piecewise Aggregate Approximation (PAA). Therefore, if you want to downsample the time series while preserving extreme points, you may give M4 a try.
+
+**(2) Use case: Error-free two-color line chart visualization of large-scale time series through M4 downsampling**
+
+Referring to paper ["M4: A Visualization-Oriented Time Series Data Aggregation"](http://www.vldb.org/pvldb/vol7/p797-jugel.pdf), M4 is a downsampling method to facilitate large-scale time series visualization without deforming the shape in terms of a two-color line chart.
+
+Given a chart of `w*h` pixels, suppose that the visualization time range of the time series is `[tqs,tqe)` and (tqe-tqs) is divisible by w, the points that fall within the  `i`-th time span `Ii=[tqs+(tqe-tqs)/w*(i-1),tqs+(tqe-tqs)/w*i)` will be drawn on the `i`-th pixel column, i=1,2,...,w. Therefore, from a visualization-driven perspective, use the sql: `"select M4(s1,'timeInterval'='(tqe-tqs)/w','displayWindowBegin'='tqs','displayWindowEnd'='tqe') from root.vehicle.d1"` to sample the ` [...]
+
+To eliminate the hassle of hardcoding parameters, we recommend the following usage of Grafana's [template variable](https://grafana.com/docs/grafana/latest/dashboards/variables/add-template-variables/#global-variables) `$__interval_ms` when Grafana is used for visualization:
+
+```
+select M4(s1,'timeInterval'='$__interval_ms') from root.sg1.d1
+```
+
+where `timeInterval` is set as `(tqe-tqs)/w` automatically. Note that the time precision here is assumed to be milliseconds.
+
+#### Comparison with Other Functions
+
+| SQL                                                          | Whether support M4 aggregation                               | Sliding window type                               | Example                                                      | Docs                                                         |
+| ------------------------------------------------------------ | ------------------------------------------------------------ | ------------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
+| 1. native built-in aggregate functions with Group By clause  | No. Lack `BOTTOM_TIME` and `TOP_TIME`, which are respectively the time of the points that have the mininum and maximum value. | Time Window                                       | `select count(status), max_value(temperature) from root.ln.wf01.wt01 group by ([2017-11-01 00:00:00, 2017-11-07 23:00:00), 3h, 1d)` | https://iotdb.apache.org/UserGuide/Master/Query-Data/Aggregate-Query.html#built-in-aggregate-functions <br />http [...]
+| 2. EQUAL_SIZE_BUCKET_M4_SAMPLE (built-in UDF)                | Yes*                                                         | Size Window. `windowSize = 4*(int)(1/proportion)` | `select equal_size_bucket_m4_sample(temperature, 'proportion'='0.1') as M4_sample from root.ln.wf01.wt01` | https://iotdb.apache.org/UserGuide/Master/Query-Data/Select-Expression.html#time-series-generating-functions |
+| **3. M4 (built-in UDF)**                                     | Yes*                                                         | Size Window, Time Window                          | (1) Size Window: `select M4(s1,'windowSize'='10') from root.vehicle.d1` <br />(2) Time Window: `select M4(s1,'timeInterval'='25','displayWindowBegin'='0','displayWindowEnd'='100') from root.vehicle.d1` | refer to this doc                                            |
+| 4. extend native built-in aggregate functions with Group By clause to support M4 aggregation | not implemented                                              | not implemented                                   | not implemented                                              | not implemented                                              |
+
+Further compare `EQUAL_SIZE_BUCKET_M4_SAMPLE` and `M4`:
+
+**(1) Different M4 aggregation definition:**
+
+For each window, `EQUAL_SIZE_BUCKET_M4_SAMPLE` extracts the top and bottom points from points **EXCLUDING** the first and last points.
+
+In contrast, `M4` extracts the top and bottom points from points **INCLUDING** the first and last points, which is more consistent with the semantics of `max_value` and `min_value` stored in metadata.
+
+It is worth noting that both functions sort and deduplicate the aggregated points in a window before outputting them to the collectors.
+
+**(2) Different sliding windows:** 
+
+`EQUAL_SIZE_BUCKET_M4_SAMPLE` uses SlidingSizeWindowAccessStrategy and **indirectly** controls sliding window size by sampling proportion. The conversion formula is `windowSize = 4*(int)(1/proportion)`. 
+
+`M4` supports two types of sliding window: SlidingSizeWindowAccessStrategy and SlidingTimeWindowAccessStrategy. `M4` **directly** controls the window point size or time length using corresponding parameters.
+
 ### User Defined Timeseries Generating Functions
 
 Please refer to [UDF (User Defined Function)](../Process-Data/UDF-User-Defined-Function.md).
@@ -557,7 +958,7 @@ expression
 
 IoTDB supports the calculation of arbitrary nested expressions consisting of **numbers, time series, time series generating functions (including user-defined functions) and arithmetic expressions** in the `select` clause.
 
-##### Example
+#### Example
 
 Input1:
 
@@ -661,13 +1062,13 @@ Total line number = 9
 It costs 0.014s
 ```
 
-##### Explanation
+#### Explanation
 
 - Only when the left operand and the right operand under a certain timestamp are not `null`, the nested expressions will have an output value. Otherwise this row will not be included in the result. 
   - In Result1 of the Example part, the value of time series `root.sg.a` at time 40 is 4, while the value of time series `root.sg.b` is `null`. So at time 40, the value of nested expressions `(a + b) * 2 + sin(a)` is `null`. So in Result2, this row is not included in the result.
 - If one operand in the nested expressions can be translated into multiple time series (For example, `*`), the result of each time series will be included in the result (Cartesian product). Please refer to Input3, Input4 and corresponding Result3 and Result4 in Example.
 
-##### Note
+#### Note
 
 > Please note that Aligned Time Series has not been supported in Nested Expressions with Time Series Query yet. An error message is expected if you use it with Aligned Time Series selected in a query statement.
 
@@ -675,7 +1076,7 @@ It costs 0.014s
 
 IoTDB supports the calculation of arbitrary nested expressions consisting of **numbers, aggregations and arithmetic expressions** in the `select` clause.
 
-##### Example
+#### Example
 
 Aggregation query without `GROUP BY`.
 
@@ -755,12 +1156,12 @@ Total line number = 8
 It costs 0.012s
 ```
 
-##### Explanation
+#### Explanation
 
 - Only when the left operand and the right operand under a certain timestamp are not `null`, the nested expressions will have an output value. Otherwise this row will not be included in the result. But for nested expressions with `GROUP BY` clause, it is better to show the result of all time intervals. Please refer to Input3 and corresponding Result3 in Example.
 - If one operand in the nested expressions can be translated into multiple time series (For example, `*`), the result of each time series will be included in the result (Cartesian product). Please refer to Input2 and corresponding Result2 in Example.
 
-##### Note
+#### Note
 
 > Automated fill (`FILL`) and grouped by level (`GROUP BY LEVEL`) are not supported in an aggregation query with expression nested. They may be supported in future versions.
 >
diff --git a/docs/UserGuide/UDF-Library/M4.md b/docs/UserGuide/UDF-Library/M4.md
deleted file mode 100644
index 420cefde3c..0000000000
--- a/docs/UserGuide/UDF-Library/M4.md
+++ /dev/null
@@ -1,92 +0,0 @@
-<!--
-
-​    Licensed to the Apache Software Foundation (ASF) under one
-​    or more contributor license agreements.  See the NOTICE file
-​    distributed with this work for additional information
-​    regarding copyright ownership.  The ASF licenses this file
-​    to you under the Apache License, Version 2.0 (the
-​    "License"); you may not use this file except in compliance
-​    with the License.  You may obtain a copy of the License at
-​    
-​        http://www.apache.org/licenses/LICENSE-2.0
-​    
-​    Unless required by applicable law or agreed to in writing,
-​    software distributed under the License is distributed on an
-​    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-​    KIND, either express or implied.  See the License for the
-​    specific language governing permissions and limitations
-​    under the License.
-
--->
-
-# M4
-
-## M4
-
-### Usage
-
-This function is used to execute the M4 aggregation query using the MAC (merging all chunks) approach.
-
-**Name:** M4
-
-**Input Series:** Only supports a single input sequence, the type is INT32 / INT64 / FLOAT / DOUBLE
-
-**Parameters:**
-
-+ `tqs`: The start time (included) of the query time range.
-+ `tqe`: The end time (excluded) of the query time range.
-+ `w`: The number of time spans in the M4 aggregation.
-
-**Output Series:** The first, last, bottom, and top points in each time span
-[tqs+(tqe-tqs)/w*(i-1),tqs+(tqe-tqs)/w*i), i=1,...,w.
-
-**Note:**
-+ This function is right now only supported in the branch `research/M4-visualization` right now.
-+ You need to make sure (tqe-tqs) is divisible by w.
-+ You need to add `time>=tqs and time<tqe` in the where clause.
-
-### Examples
-
-Input series:
-
-```
-+-----------------------------+------------------+
-|                         Time|root.vehicle.d0.s0|
-+-----------------------------+------------------+
-|1970-01-01T08:00:00.001+08:00|               5.0|
-|1970-01-01T08:00:00.002+08:00|              15.0|
-|1970-01-01T08:00:00.005+08:00|              10.0|
-|1970-01-01T08:00:00.008+08:00|               8.0|
-|1970-01-01T08:00:00.010+08:00|              20.0|
-|1970-01-01T08:00:00.020+08:00|              20.0|
-|1970-01-01T08:00:00.025+08:00|               8.0|
-|1970-01-01T08:00:00.027+08:00|              20.0|
-|1970-01-01T08:00:00.030+08:00|              40.0|
-|1970-01-01T08:00:00.033+08:00|               9.0|
-|1970-01-01T08:00:00.035+08:00|              10.0|
-|1970-01-01T08:00:00.040+08:00|              20.0|
-|1970-01-01T08:00:00.045+08:00|              30.0|
-|1970-01-01T08:00:00.052+08:00|               8.0|
-|1970-01-01T08:00:00.054+08:00|              18.0|
-|1970-01-01T08:00:00.120+08:00|               8.0|
-+-----------------------------+------------------+
-```
-
-SQL for query:
-
-```sql
-select M4(s0,'tqs'='0','tqe'='100','w'='4') from root.vehicle.d0 where time>=0 and time<100
-```
-
-Output:
-
-```
-+-----------------------------+----------------------------------------------------------------------------------+
-|                         Time|                           M4(root.vehicle.d0.s0, "tqs"="0", "tqe"="100", "w"="4")|
-+-----------------------------+----------------------------------------------------------------------------------+
-|1970-01-01T08:00:00.000+08:00|  FirstPoint=(1,5.0), LastPoint=(20,20.0), BottomPoint=(1,5.0), TopPoint=(10,20.0)|
-|1970-01-01T08:00:00.025+08:00|FirstPoint=(25,8.0), LastPoint=(45,30.0), BottomPoint=(25,8.0), TopPoint=(30,40.0)|
-|1970-01-01T08:00:00.050+08:00|FirstPoint=(52,8.0), LastPoint=(54,18.0), BottomPoint=(52,8.0), TopPoint=(54,18.0)|
-|1970-01-01T08:00:00.075+08:00|                                                                             empty|
-+-----------------------------+----------------------------------------------------------------------------------+
-```
\ No newline at end of file
diff --git a/docs/zh/UserGuide/Query-Data/Select-Expression.md b/docs/zh/UserGuide/Query-Data/Select-Expression.md
index 8855e07abc..5c8aba16ea 100644
--- a/docs/zh/UserGuide/Query-Data/Select-Expression.md
+++ b/docs/zh/UserGuide/Query-Data/Select-Expression.md
@@ -19,9 +19,9 @@
 
 -->
 
-## 选择表达式
+# 选择表达式
 
-### 语法定义
+## 语法定义
 
 选择表达式(`selectExpr`)是 SELECT 子句的组成单元,每个 `selectExpr` 对应查询结果集中的一列,其语法定义如下:
 
@@ -58,13 +58,13 @@ selectExpr
   - 聚合查询嵌套表达式
 - 数字常量(仅用于表达式)
 
-### 算数运算查询
+## 算数运算查询
 
 > 请注意,目前对齐时间序列(Aligned Timeseries)尚不支持算数运算查询。算数运算所选时间序列包含对齐时间序列时,会提示错误。
 
-#### 运算符
+### 运算符
 
-##### 一元算数运算符
+#### 一元算数运算符
 
 支持的运算符:`+`, `-`
 
@@ -72,7 +72,7 @@ selectExpr
 
 输出数据类型:与输入数据类型一致
 
-##### 二元算数运算符
+#### 二元算数运算符
 
 支持的运算符:`+`, `-`, `*`, `/`,  `%`
 
@@ -82,7 +82,7 @@ selectExpr
 
 注意:当某个时间戳下左操作数和右操作数都不为空(`null`)时,二元运算操作才会有输出结果
 
-#### 使用示例
+### 示例
 
 例如:
 
@@ -106,7 +106,7 @@ Total line number = 5
 It costs 0.014s
 ```
 
-### 内置时间序列生成函数
+## 内置时间序列生成函数
 
 时间序列生成函数可接受若干原始时间序列作为输入,产生一列时间序列输出。与聚合函数不同的是,时间序列生成函数的结果集带有时间戳列。
 
@@ -114,7 +114,7 @@ It costs 0.014s
 
 > 请注意,目前对齐时间序列(Aligned Timeseries)尚不支持内置函数查询。使用内置函数时,如果自变量包含对齐时间序列,会提示错误。
 
-#### 数学函数
+### 数学函数
 
 目前 IoTDB 支持下列数学函数,这些数学函数的行为与这些函数在 Java Math 标准库中对应实现的行为一致。
 
@@ -163,7 +163,7 @@ Total line number = 5
 It costs 0.008s
 ```
 
-#### 字符串函数
+### 字符串函数
 
 目前 IoTDB 支持下列字符串处理函数:
 
@@ -192,7 +192,7 @@ Total line number = 3
 It costs 0.007s
 ```
 
-#### 选择函数
+### 选择函数
 
 目前 IoTDB 支持如下选择函数:
 
@@ -223,7 +223,7 @@ Total line number = 5
 It costs 0.006s
 ```
 
-#### 趋势计算函数
+### 趋势计算函数
 
 目前 IoTDB 支持如下趋势计算函数:
 
@@ -257,7 +257,7 @@ Total line number = 5
 It costs 0.014s
 ```
 
-#### 常序列生成函数
+### 常序列生成函数
 
 常序列生成函数用于生成所有数据点的值都相同的时间序列。
 
@@ -294,7 +294,7 @@ Total line number = 5
 It costs 0.005s
 ```
 
-#### 数据类型转换函数
+### 数据类型转换函数
 
 当前IoTDB支持6种数据类型,其中包括INT32、INT64、FLOAT、DOUBLE、BOOLEAN以及TEXT。当我们对数据进行查询或者计算时可能需要进行数据类型的转换, 比如说将TEXT转换为INT32,或者提高数据精度,比如说将FLOAT转换为DOUBLE。所以,IoTDB支持使用cast函数对数据类型进行转换。  
 
@@ -302,7 +302,7 @@ It costs 0.005s
 | ------ | ------------------------------------------------------------ | ------------------------ | ---------------------------------- |
 | CAST   | `type`:输出的数据点的类型,只能是 INT32 / INT64 / FLOAT / DOUBLE / BOOLEAN / TEXT | 由输入属性参数`type`决定 | 将数据转换为`type`参数指定的类型。 |
 
-##### 类型转换说明
+#### 类型转换说明
 
 1.当INT32、INT64类型的值不为0时,FLOAT与DOUBLE类型的值不为0.0时,TEXT类型不为空字符串或者"false"时,转换为BOOLEAN类型时值为true,否则为false。
 
@@ -376,7 +376,7 @@ It costs 0.009s
 
 
 
-##### 演示
+#### 示例
 测试数据:
 ```
 IoTDB> select text from root.test;
@@ -407,7 +407,7 @@ Total line number = 4
 It costs 0.078s
 ```
 
-#### 条件函数
+### 条件函数
 条件函数针对每个数据点进行条件判断,返回布尔值
 
 目前IoTDB支持以下条件函数
@@ -430,7 +430,7 @@ IoTDB> select ts from root.test;
 +-----------------------------+------------+
 ```
 
-##### 测试1
+#### 示例1
 
 SQL语句:
 ```sql
@@ -450,7 +450,7 @@ IoTDB> select ts, on_off(ts, 'threshold'='2') from root.test;
 +-----------------------------+------------+-------------------------------------+
 ```
 
-##### 测试2
+#### 示例2
 Sql语句:
 ```sql
 select ts, in_range(ts, 'lower'='2', 'upper'='3.1') from root.test;
@@ -469,7 +469,7 @@ IoTDB> select ts, in_range(ts, 'lower'='2', 'upper'='3.1') from root.test;
 +-----------------------------+------------+--------------------------------------------------+
 ```
 
-#### 连续满足区间函数
+### 连续满足区间函数
 连续满足条件区间函数用来查询所有满足指定条件的连续区间。
 按返回值可分为两类:
   1. 返回满足条件连续区间的起始时间戳和时间跨度(时间跨度为0表示此处只有起始时间这一个数据点满足条件)
@@ -482,7 +482,7 @@ IoTDB> select ts, in_range(ts, 'lower'='2', 'upper'='3.1') from root.test;
 | ZERO_COUNT        | INT32/ INT64/ FLOAT/ DOUBLE/ BOOLEAN | `min`:可选,默认值1<br>`max`:可选,默认值`Long.MAX_VALUE` | Long  | 返回时间序列连续为0(false)的开始时间与其后数据点的个数,数据点个数n满足`n >= min && n <= max`   |               |
 | NON_ZERO_COUNT    | INT32/ INT64/ FLOAT/ DOUBLE/ BOOLEAN | `min`:可选,默认值1<br>`max`:可选,默认值`Long.MAX_VALUE` | Long  | 返回时间序列连续不为0(false)的开始时间与其后数据点的个数,数据点个数n满足`n >= min && n <= max`  |               |
 
-##### 演示
+#### 示例
 测试数据:
 ```
 IoTDB> select s1,s2,s3,s4,s5 from root.sg.d2;
@@ -519,7 +519,413 @@ select s1, zero_count(s1), non_zero_count(s2), zero_duration(s3), non_zero_durat
 +-----------------------------+-------------+-------------------------+-----------------------------+----------------------------+--------------------------------+
 ```
 
-#### 自定义时间序列生成函数
+###  等数量分桶降采样函数
+
+本函数对输入序列进行等数量分桶采样,即根据用户给定的降采样比例和降采样方法将输入序列按固定点数等分为若干桶,然后在每个桶内通过给定的采样方法进行采样。
+
+- `proportion`:降采样比例,取值范围为`(0, 1]`.
+- 四种降采样方法: 
+    - `equal_size_bucket_random_sample`:等数量分桶随机采样
+    - `equal_size_bucket_agg_sample`:等数量分桶聚合采样
+    - `equal_size_bucket_m4_sample`:等数量分桶M4采样
+    - `equal_size_bucket_outlier_sample`:等数量分桶离群值采样
+
+####  等数量分桶随机采样
+
+对等数量分桶后,桶内进行随机采样。
+
+| 函数名                          | 可接收的输入序列类型           | 必要的属性参数                                           | 输出序列类型                   | 功能类型                         |
+| ------------------------------- | ------------------------------ | -------------------------------------------------------- | ------------------------------ | -------------------------------- |
+| EQUAL_SIZE_BUCKET_RANDOM_SAMPLE | INT32 / INT64 / FLOAT / DOUBLE | 降采样比例 `proportion`,取值范围为`(0, 1]`,默认为`0.1` | INT32 / INT64 / FLOAT / DOUBLE | 返回符合采样比例的等分桶随机采样 |
+
+#####  示例
+
+输入序列:`root.ln.wf01.wt01.temperature`从`0.0-99.0`共`100`条数据。
+
+```sql
+IoTDB> select temperature from root.ln.wf01.wt01;
++-----------------------------+-----------------------------+
+|                         Time|root.ln.wf01.wt01.temperature|
++-----------------------------+-----------------------------+
+|1970-01-01T08:00:00.000+08:00|                          0.0|
+|1970-01-01T08:00:00.001+08:00|                          1.0|
+|1970-01-01T08:00:00.002+08:00|                          2.0|
+|1970-01-01T08:00:00.003+08:00|                          3.0|
+|1970-01-01T08:00:00.004+08:00|                          4.0|
+|1970-01-01T08:00:00.005+08:00|                          5.0|
+|1970-01-01T08:00:00.006+08:00|                          6.0|
+|1970-01-01T08:00:00.007+08:00|                          7.0|
+|1970-01-01T08:00:00.008+08:00|                          8.0|
+|1970-01-01T08:00:00.009+08:00|                          9.0|
+|1970-01-01T08:00:00.010+08:00|                         10.0|
+|1970-01-01T08:00:00.011+08:00|                         11.0|
+|1970-01-01T08:00:00.012+08:00|                         12.0|
+|.............................|.............................|            
+|1970-01-01T08:00:00.089+08:00|                         89.0|
+|1970-01-01T08:00:00.090+08:00|                         90.0|
+|1970-01-01T08:00:00.091+08:00|                         91.0|
+|1970-01-01T08:00:00.092+08:00|                         92.0|
+|1970-01-01T08:00:00.093+08:00|                         93.0|
+|1970-01-01T08:00:00.094+08:00|                         94.0|
+|1970-01-01T08:00:00.095+08:00|                         95.0|
+|1970-01-01T08:00:00.096+08:00|                         96.0|
+|1970-01-01T08:00:00.097+08:00|                         97.0|
+|1970-01-01T08:00:00.098+08:00|                         98.0|
+|1970-01-01T08:00:00.099+08:00|                         99.0|
++-----------------------------+-----------------------------+
+```
+
+sql:
+
+```sql
+select equal_size_bucket_random_sample(temperature,'proportion'='0.1') as random_sample from root.ln.wf01.wt01;
+```
+
+结果:
+
+```sql
++-----------------------------+-------------+
+|                         Time|random_sample|
++-----------------------------+-------------+
+|1970-01-01T08:00:00.007+08:00|          7.0|
+|1970-01-01T08:00:00.014+08:00|         14.0|
+|1970-01-01T08:00:00.020+08:00|         20.0|
+|1970-01-01T08:00:00.035+08:00|         35.0|
+|1970-01-01T08:00:00.047+08:00|         47.0|
+|1970-01-01T08:00:00.059+08:00|         59.0|
+|1970-01-01T08:00:00.063+08:00|         63.0|
+|1970-01-01T08:00:00.079+08:00|         79.0|
+|1970-01-01T08:00:00.086+08:00|         86.0|
+|1970-01-01T08:00:00.096+08:00|         96.0|
++-----------------------------+-------------+
+Total line number = 10
+It costs 0.024s
+```
+
+####  等数量分桶聚合采样
+
+采用聚合采样法对输入序列进行采样,用户需要另外提供一个聚合函数参数即
+
+- `type`:聚合类型,取值为`avg`或`max`或`min`或`sum`或`extreme`或`variance`。在缺省情况下,采用`avg`。其中`extreme`表示等分桶中,绝对值最大的值。`variance`表示采样等分桶中的方差。
+
+每个桶采样输出的时间戳为这个桶第一个点的时间戳
+
+
+| 函数名                       | 可接收的输入序列类型           | 必要的属性参数                                               | 输出序列类型                   | 功能类型                         |
+| ---------------------------- | ------------------------------ | ------------------------------------------------------------ | ------------------------------ | -------------------------------- |
+| EQUAL_SIZE_BUCKET_AGG_SAMPLE | INT32 / INT64 / FLOAT / DOUBLE | `proportion`取值范围为`(0, 1]`,默认为`0.1`<br>`type`:取值类型有`avg`, `max`, `min`, `sum`, `extreme`, `variance`, 默认为`avg` | INT32 / INT64 / FLOAT / DOUBLE | 返回符合采样比例的等分桶聚合采样 |
+
+#####  示例
+
+输入序列:`root.ln.wf01.wt01.temperature`从`0.0-99.0`共`100`条有序数据,同等分桶随机采样的测试数据。
+
+sql:
+
+```sql
+select equal_size_bucket_agg_sample(temperature, 'type'='avg','proportion'='0.1') as agg_avg, equal_size_bucket_agg_sample(temperature, 'type'='max','proportion'='0.1') as agg_max, equal_size_bucket_agg_sample(temperature,'type'='min','proportion'='0.1') as agg_min, equal_size_bucket_agg_sample(temperature, 'type'='sum','proportion'='0.1') as agg_sum, equal_size_bucket_agg_sample(temperature, 'type'='extreme','proportion'='0.1') as agg_extreme, equal_size_bucket_agg_sample(temperature, ' [...]
+```
+
+结果:
+
+```sql
++-----------------------------+-----------------+-------+-------+-------+-----------+------------+
+|                         Time|          agg_avg|agg_max|agg_min|agg_sum|agg_extreme|agg_variance|
++-----------------------------+-----------------+-------+-------+-------+-----------+------------+
+|1970-01-01T08:00:00.000+08:00|              4.5|    9.0|    0.0|   45.0|        9.0|        8.25|
+|1970-01-01T08:00:00.010+08:00|             14.5|   19.0|   10.0|  145.0|       19.0|        8.25|
+|1970-01-01T08:00:00.020+08:00|             24.5|   29.0|   20.0|  245.0|       29.0|        8.25|
+|1970-01-01T08:00:00.030+08:00|             34.5|   39.0|   30.0|  345.0|       39.0|        8.25|
+|1970-01-01T08:00:00.040+08:00|             44.5|   49.0|   40.0|  445.0|       49.0|        8.25|
+|1970-01-01T08:00:00.050+08:00|             54.5|   59.0|   50.0|  545.0|       59.0|        8.25|
+|1970-01-01T08:00:00.060+08:00|             64.5|   69.0|   60.0|  645.0|       69.0|        8.25|
+|1970-01-01T08:00:00.070+08:00|74.50000000000001|   79.0|   70.0|  745.0|       79.0|        8.25|
+|1970-01-01T08:00:00.080+08:00|             84.5|   89.0|   80.0|  845.0|       89.0|        8.25|
+|1970-01-01T08:00:00.090+08:00|             94.5|   99.0|   90.0|  945.0|       99.0|        8.25|
++-----------------------------+-----------------+-------+-------+-------+-----------+------------+
+Total line number = 10
+It costs 0.044s
+```
+
+####  等数量分桶M4采样
+
+采用M4采样法对输入序列进行采样。即对于每个桶采样首、尾、最小和最大值。
+
+| 函数名                      | 可接收的输入序列类型           | 必要的属性参数                              | 输出序列类型                   | 功能类型                       |
+| --------------------------- | ------------------------------ | ------------------------------------------- | ------------------------------ | ------------------------------ |
+| EQUAL_SIZE_BUCKET_M4_SAMPLE | INT32 / INT64 / FLOAT / DOUBLE | `proportion`取值范围为`(0, 1]`,默认为`0.1` | INT32 / INT64 / FLOAT / DOUBLE | 返回符合采样比例的等分桶M4采样 |
+
+#####  示例
+
+输入序列:`root.ln.wf01.wt01.temperature`从`0.0-99.0`共`100`条有序数据,同等分桶随机采样的测试数据。
+
+sql:
+
+```sql
+select equal_size_bucket_m4_sample(temperature, 'proportion'='0.1') as M4_sample from root.ln.wf01.wt01;
+```
+
+结果:
+
+```sql
++-----------------------------+---------+
+|                         Time|M4_sample|
++-----------------------------+---------+
+|1970-01-01T08:00:00.000+08:00|      0.0|
+|1970-01-01T08:00:00.001+08:00|      1.0|
+|1970-01-01T08:00:00.038+08:00|     38.0|
+|1970-01-01T08:00:00.039+08:00|     39.0|
+|1970-01-01T08:00:00.040+08:00|     40.0|
+|1970-01-01T08:00:00.041+08:00|     41.0|
+|1970-01-01T08:00:00.078+08:00|     78.0|
+|1970-01-01T08:00:00.079+08:00|     79.0|
+|1970-01-01T08:00:00.080+08:00|     80.0|
+|1970-01-01T08:00:00.081+08:00|     81.0|
+|1970-01-01T08:00:00.098+08:00|     98.0|
+|1970-01-01T08:00:00.099+08:00|     99.0|
++-----------------------------+---------+
+Total line number = 12
+It costs 0.065s
+```
+
+####  等数量分桶离群值采样
+
+本函数对输入序列进行等数量分桶离群值采样,即根据用户给定的降采样比例和桶内采样个数将输入序列按固定点数等分为若干桶,在每个桶内通过给定的离群值采样方法进行采样。
+
+| 函数名                           | 可接收的输入序列类型           | 必要的属性参数                                               | 输出序列类型                   | 功能类型                                         |
+| -------------------------------- | ------------------------------ | ------------------------------------------------------------ | ------------------------------ | ------------------------------------------------ |
+| EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE | INT32 / INT64 / FLOAT / DOUBLE | `proportion`取值范围为`(0, 1]`,默认为`0.1`<br>`type`取值为`avg`或`stendis`或`cos`或`prenextdis`,默认为`avg`<br>`number`取值应大于0,默认`3` | INT32 / INT64 / FLOAT / DOUBLE | 返回符合采样比例和桶内采样个数的等分桶离群值采样 |
+
+参数说明
+
+- `proportion`: 采样比例
+    - `number`: 每个桶内的采样个数,默认`3`
+- `type`: 离群值采样方法,取值为
+    - `avg`: 取桶内数据点的平均值,并根据采样比例,找到距离均值最远的`top number`个
+    - `stendis`: 取桶内每一个数据点距离桶的首末数据点连成直线的垂直距离,并根据采样比例,找到距离最大的`top number`个
+    - `cos`: 设桶内一个数据点为b,b左边的数据点为a,b右边的数据点为c,则取ab与bc向量的夹角的余弦值,值越小,说明形成的角度越大,越可能是异常值。找到cos值最小的`top number`个
+    - `prenextdis`: 设桶内一个数据点为b,b左边的数据点为a,b右边的数据点为c,则取ab与bc的长度之和作为衡量标准,和越大越可能是异常值,找到最大的`top number`个
+
+#####  示例
+
+测试数据:`root.ln.wf01.wt01.temperature`从`0.0-99.0`共`100`条数据,其中为了加入离群值,我们使得个位数为5的值自增100。
+
+```sql
+IoTDB> select temperature from root.ln.wf01.wt01;
++-----------------------------+-----------------------------+
+|                         Time|root.ln.wf01.wt01.temperature|
++-----------------------------+-----------------------------+
+|1970-01-01T08:00:00.000+08:00|                          0.0|
+|1970-01-01T08:00:00.001+08:00|                          1.0|
+|1970-01-01T08:00:00.002+08:00|                          2.0|
+|1970-01-01T08:00:00.003+08:00|                          3.0|
+|1970-01-01T08:00:00.004+08:00|                          4.0|
+|1970-01-01T08:00:00.005+08:00|                        105.0|
+|1970-01-01T08:00:00.006+08:00|                          6.0|
+|1970-01-01T08:00:00.007+08:00|                          7.0|
+|1970-01-01T08:00:00.008+08:00|                          8.0|
+|1970-01-01T08:00:00.009+08:00|                          9.0|
+|1970-01-01T08:00:00.010+08:00|                         10.0|
+|1970-01-01T08:00:00.011+08:00|                         11.0|
+|1970-01-01T08:00:00.012+08:00|                         12.0|
+|1970-01-01T08:00:00.013+08:00|                         13.0|
+|1970-01-01T08:00:00.014+08:00|                         14.0|
+|1970-01-01T08:00:00.015+08:00|                        115.0|
+|1970-01-01T08:00:00.016+08:00|                         16.0|
+|.............................|.............................|
+|1970-01-01T08:00:00.092+08:00|                         92.0|
+|1970-01-01T08:00:00.093+08:00|                         93.0|
+|1970-01-01T08:00:00.094+08:00|                         94.0|
+|1970-01-01T08:00:00.095+08:00|                        195.0|
+|1970-01-01T08:00:00.096+08:00|                         96.0|
+|1970-01-01T08:00:00.097+08:00|                         97.0|
+|1970-01-01T08:00:00.098+08:00|                         98.0|
+|1970-01-01T08:00:00.099+08:00|                         99.0|
++-----------------------------+-----------------------------+
+```
+
+sql:
+
+```sql
+select equal_size_bucket_outlier_sample(temperature, 'proportion'='0.1', 'type'='avg', 'number'='2') as outlier_avg_sample, equal_size_bucket_outlier_sample(temperature, 'proportion'='0.1', 'type'='stendis', 'number'='2') as outlier_stendis_sample, equal_size_bucket_outlier_sample(temperature, 'proportion'='0.1', 'type'='cos', 'number'='2') as outlier_cos_sample, equal_size_bucket_outlier_sample(temperature, 'proportion'='0.1', 'type'='prenextdis', 'number'='2') as outlier_prenextdis_sam [...]
+```
+
+结果:
+
+```sql
++-----------------------------+------------------+----------------------+------------------+-------------------------+
+|                         Time|outlier_avg_sample|outlier_stendis_sample|outlier_cos_sample|outlier_prenextdis_sample|
++-----------------------------+------------------+----------------------+------------------+-------------------------+
+|1970-01-01T08:00:00.005+08:00|             105.0|                 105.0|             105.0|                    105.0|
+|1970-01-01T08:00:00.015+08:00|             115.0|                 115.0|             115.0|                    115.0|
+|1970-01-01T08:00:00.025+08:00|             125.0|                 125.0|             125.0|                    125.0|
+|1970-01-01T08:00:00.035+08:00|             135.0|                 135.0|             135.0|                    135.0|
+|1970-01-01T08:00:00.045+08:00|             145.0|                 145.0|             145.0|                    145.0|
+|1970-01-01T08:00:00.055+08:00|             155.0|                 155.0|             155.0|                    155.0|
+|1970-01-01T08:00:00.065+08:00|             165.0|                 165.0|             165.0|                    165.0|
+|1970-01-01T08:00:00.075+08:00|             175.0|                 175.0|             175.0|                    175.0|
+|1970-01-01T08:00:00.085+08:00|             185.0|                 185.0|             185.0|                    185.0|
+|1970-01-01T08:00:00.095+08:00|             195.0|                 195.0|             195.0|                    195.0|
++-----------------------------+------------------+----------------------+------------------+-------------------------+
+Total line number = 10
+It costs 0.041s
+```
+
+### M4函数
+
+####  函数简介
+
+M4用于在窗口内采样第一个点(`first`)、最后一个点(`last`)、最小值点(`bottom`)、最大值点(`top`):
+
+-   第一个点是拥有这个窗口内最小时间戳的点;
+-   最后一个点是拥有这个窗口内最大时间戳的点;
+-   最小值点是拥有这个窗口内最小值的点(如果有多个这样的点,M4只返回其中一个);
+-   最大值点是拥有这个窗口内最大值的点(如果有多个这样的点,M4只返回其中一个)。
+
+<img src="https://alioss.timecho.com/docs/img/github/198178733-a0919d17-0663-4672-9c4f-1efad6f463c2.png" alt="image" style="zoom:50%;" />
+
+| 函数名 | 可接收的输入序列类型           | 属性参数                                                     | 输出序列类型                   | 功能类型                                                     |
+| ------ | ------------------------------ | ------------------------------------------------------------ | ------------------------------ | ------------------------------------------------------------ |
+| M4     | INT32 / INT64 / FLOAT / DOUBLE | 包含固定点数的窗口和滑动时间窗口使用不同的属性参数。包含固定点数的窗口使用属性`windowSize`和`slidingStep`。滑动时间窗口使用属性`timeInterval`、`slidingStep`、`displayWindowBegin`和`displayWindowEnd`。更多细节见下文。 | INT32 / INT64 / FLOAT / DOUBLE | 返回每个窗口内的第一个点(`first`)、最后一个点(`last`)、最小值点(`bottom`)、最大值点(`top`)。在一个窗口内的聚合点输出之前,M4会将它们按照时间戳递增排序并且去重。 |
+
+####  属性参数
+
+**(1) 包含固定点数的窗口(SlidingSizeWindowAccessStrategy)使用的属性参数:**
+
++ `windowSize`: 一个窗口内的点数。Int数据类型。必需的属性参数。
++ `slidingStep`: 按照设定的点数来滑动窗口。Int数据类型。可选的属性参数;如果没有设置,默认取值和`windowSize`一样。
+
+<img src="https://alioss.timecho.com/docs/img/github/198181449-00d563c8-7bce-4ecd-a031-ec120ca42c3f.png" alt="image" style="zoom: 50%;" />
+
+**(2) 滑动时间窗口(SlidingTimeWindowAccessStrategy)使用的属性参数:**
+
++ `timeInterval`: 一个窗口的时间长度。Long数据类型。必需的属性参数。
++ `slidingStep`: 按照设定的时长来滑动窗口。Long数据类型。可选的属性参数;如果没有设置,默认取值和`timeInterval`一样。
++ `displayWindowBegin`: 窗口滑动的起始时间戳位置(包含在内)。Long数据类型。可选的属性参数;如果没有设置,默认取值为Long.MIN_VALUE,意为使用输入的时间序列的第一个点的时间戳作为窗口滑动的起始时间戳位置。
++ `displayWindowEnd`: 结束时间限制(不包含在内;本质上和`WHERE time < displayWindowEnd`起的效果是一样的)。Long数据类型。可选的属性参数;如果没有设置,默认取值为Long.MAX_VALUE,意为除了输入的时间序列自身数据读取完毕之外没有增加额外的结束时间过滤条件限制。
+
+<img src="https://alioss.timecho.com/docs/img/github/198183015-93b56644-3330-4acf-ae9e-d718a02b5f4c.png" alt="groupBy window" style="zoom: 67%;" />
+
+####  示例
+
+输入的时间序列:
+
+```sql
++-----------------------------+------------------+
+|                         Time|root.vehicle.d1.s1|
++-----------------------------+------------------+
+|1970-01-01T08:00:00.001+08:00|               5.0|
+|1970-01-01T08:00:00.002+08:00|              15.0|
+|1970-01-01T08:00:00.005+08:00|              10.0|
+|1970-01-01T08:00:00.008+08:00|               8.0|
+|1970-01-01T08:00:00.010+08:00|              30.0|
+|1970-01-01T08:00:00.020+08:00|              20.0|
+|1970-01-01T08:00:00.025+08:00|               8.0|
+|1970-01-01T08:00:00.027+08:00|              20.0|
+|1970-01-01T08:00:00.030+08:00|              40.0|
+|1970-01-01T08:00:00.033+08:00|               9.0|
+|1970-01-01T08:00:00.035+08:00|              10.0|
+|1970-01-01T08:00:00.040+08:00|              20.0|
+|1970-01-01T08:00:00.045+08:00|              30.0|
+|1970-01-01T08:00:00.052+08:00|               8.0|
+|1970-01-01T08:00:00.054+08:00|              18.0|
++-----------------------------+------------------+
+```
+
+查询语句1:
+
+```sql
+select M4(s1,'timeInterval'='25','displayWindowBegin'='0','displayWindowEnd'='100') from root.vehicle.d1
+```
+
+输出结果1:
+
+```sql
++-----------------------------+-----------------------------------------------------------------------------------------------+
+|                         Time|M4(root.vehicle.d1.s1, "timeInterval"="25", "displayWindowBegin"="0", "displayWindowEnd"="100")|
++-----------------------------+-----------------------------------------------------------------------------------------------+
+|1970-01-01T08:00:00.001+08:00|                                                                                            5.0|
+|1970-01-01T08:00:00.010+08:00|                                                                                           30.0|
+|1970-01-01T08:00:00.020+08:00|                                                                                           20.0|
+|1970-01-01T08:00:00.025+08:00|                                                                                            8.0|
+|1970-01-01T08:00:00.030+08:00|                                                                                           40.0|
+|1970-01-01T08:00:00.045+08:00|                                                                                           30.0|
+|1970-01-01T08:00:00.052+08:00|                                                                                            8.0|
+|1970-01-01T08:00:00.054+08:00|                                                                                           18.0|
++-----------------------------+-----------------------------------------------------------------------------------------------+
+Total line number = 8
+```
+
+查询语句2:
+
+```sql
+select M4(s1,'windowSize'='10') from root.vehicle.d1
+```
+
+输出结果2:
+
+```sql
++-----------------------------+-----------------------------------------+
+|                         Time|M4(root.vehicle.d1.s1, "windowSize"="10")|
++-----------------------------+-----------------------------------------+
+|1970-01-01T08:00:00.001+08:00|                                      5.0|
+|1970-01-01T08:00:00.030+08:00|                                     40.0|
+|1970-01-01T08:00:00.033+08:00|                                      9.0|
+|1970-01-01T08:00:00.035+08:00|                                     10.0|
+|1970-01-01T08:00:00.045+08:00|                                     30.0|
+|1970-01-01T08:00:00.052+08:00|                                      8.0|
+|1970-01-01T08:00:00.054+08:00|                                     18.0|
++-----------------------------+-----------------------------------------+
+Total line number = 7
+```
+
+####  推荐的使用场景
+
+**(1) 使用场景:保留极端点的降采样**
+
+由于M4为每个窗口聚合其第一个点(`first`)、最后一个点(`last`)、最小值点(`bottom`)、最大值点(`top`),因此M4通常保留了极值点,因此比其他下采样方法(如分段聚合近似 (PAA))能更好地保留模式。如果你想对时间序列进行下采样并且希望保留极值点,你可以试试 M4。
+
+**(2) 使用场景:基于M4降采样的大规模时间序列的零误差双色折线图可视化**
+
+参考论文["M4: A Visualization-Oriented Time Series Data Aggregation"](http://www.vldb.org/pvldb/vol7/p797-jugel.pdf),作为大规模时间序列可视化的降采样方法,M4可以做到双色折线图的零变形。
+
+假设屏幕画布的像素宽乘高是`w*h`,假设时间序列要可视化的时间范围是`[tqs,tqe)`,并且(tqe-tqs)是w的整数倍,那么落在第i个时间跨度`Ii=[tqs+(tqe-tqs)/w*(i-1),tqs+(tqe-tqs)/w*i)` 内的点将会被画在第i个像素列中,i=1,2,...,w。于是从可视化驱动的角度出发,使用查询语句:`"select M4(s1,'timeInterval'='(tqe-tqs)/w','displayWindowBegin'='tqs','displayWindowEnd'='tqe') from root.vehicle.d1"`,来采集每个时间跨度内的第一个点(`first`)、最后一个点(`last`)、最小值点(`bottom`)、最大值点(`top`)。降采样时间序列的结果点数不会超过`4*w`个,与此同时,使用这些聚合点画出来的二色折线图与使用原始数据画出来的在像素级别上是完全一致的。
+
+为了免除参数值硬编码的麻烦,当Grafana用于可视化时,我们推荐使用Grafana的[模板变量](https://grafana.com/docs/grafana/latest/dashboards/variables/add-template-variables/#global-variables)`$ __interval_ms`,如下所示:
+
+```sql
+select M4(s1,'timeInterval'='$__interval_ms') from root.sg1.d1
+```
+
+其中`timeInterval`自动设置为`(tqe-tqs)/w`。请注意,这里的时间精度假定为毫秒。
+
+####  和其它函数的功能比较
+
+| SQL                                               | 是否支持M4聚合                                               | 滑动窗口类型                                      | 示例                                                         | 相关文档                                                     |
+| ------------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
+| 1. 带有Group By子句的内置聚合函数                 | 不支持,缺少`BOTTOM_TIME`和`TOP_TIME`,即缺少最小值点和最大值点的时间戳。 | Time Window                                       | `select count(status), max_value(temperature) from root.ln.wf01.wt01 group by ([2017-11-01 00:00:00, 2017-11-07 23:00:00), 3h, 1d)` | https://iotdb.apache.org/UserGuide/Master/Query-Data/Aggregate-Query.html#built-in-aggregate-functions <br />https://iotdb.apache.org/UserGuide/Master/Query-Data/Aggregate-Query.html#downsampling-aggregate-query |
+| 2. EQUAL_SIZE_BUCKET_M4_SAMPLE (内置UDF)          | 支持*                                                        | Size Window. `windowSize = 4*(int)(1/proportion)` | `select equal_size_bucket_m4_sample(temperature, 'proportion'='0.1') as M4_sample from root.ln.wf01.wt01` | https://iotdb.apache.org/UserGuide/Master/Query-Data/Select-Expression.html#time-series-generating-functions |
+| **3. M4 (内置UDF)**                               | 支持*                                                        | Size Window, Time Window                          | (1) Size Window: `select M4(s1,'windowSize'='10') from root.vehicle.d1` <br />(2) Time Window: `select M4(s1,'timeInterval'='25','displayWindowBegin'='0','displayWindowEnd'='100') from root.vehicle.d1` | 本文档                                                       |
+| 4. 扩展带有Group By子句的内置聚合函数来支持M4聚合 | 未实施                                                       | 未实施                                            | 未实施                                                       | 未实施                                                       |
+
+进一步比较`EQUAL_SIZE_BUCKET_M4_SAMPLE`和`M4`:
+
+**(1) 不同的M4聚合函数定义:**
+
+在每个窗口内,`EQUAL_SIZE_BUCKET_M4_SAMPLE`从排除了第一个点和最后一个点之后剩余的点中提取最小值点和最大值点。
+
+而`M4`则是从窗口内所有点中(包括第一个点和最后一个点)提取最小值点和最大值点,这个定义与元数据中保存的`max_value`和`min_value`的语义更加一致。
+
+值得注意的是,在一个窗口内的聚合点输出之前,`EQUAL_SIZE_BUCKET_M4_SAMPLE`和`M4`都会将它们按照时间戳递增排序并且去重。
+
+**(2) 不同的滑动窗口:**
+
+`EQUAL_SIZE_BUCKET_M4_SAMPLE`使用SlidingSizeWindowAccessStrategy,并且通过采样比例(`proportion`)来间接控制窗口点数(`windowSize`),转换公式是`windowSize = 4*(int)(1/proportion)`。
+
+`M4`支持两种滑动窗口:SlidingSizeWindowAccessStrategy和SlidingTimeWindowAccessStrategy,并且`M4`通过相应的参数直接控制窗口的点数或者时长。
+
+
+
+### 自定义时间序列生成函数
 
 请参考 [UDF](../Process-Data/UDF-User-Defined-Function.md)。
 
@@ -527,7 +933,7 @@ select s1, zero_count(s1), non_zero_count(s2), zero_duration(s3), non_zero_durat
 
 + [IoTDB-Quality](https://thulab.github.io/iotdb-quality),一个关于数据质量的 UDF 库实现,包括数据画像、数据质量评估与修复等一系列函数。
 
-### 嵌套表达式
+## 嵌套表达式
 
 IoTDB支持嵌套表达式,由于聚合查询和时间序列查询不能在一条查询语句中同时出现,我们将支持的嵌套表达式分为时间序列查询嵌套表达式和聚合查询嵌套表达式两类。
 
@@ -553,11 +959,11 @@ expression
     ;
 ```
 
-#### 时间序列查询嵌套表达式
+### 时间序列查询嵌套表达式
 
 IoTDB 支持在 `SELECT` 子句中计算由**数字常量,时间序列、时间序列生成函数(包括用户自定义函数)和算数运算表达式**组成的任意嵌套表达式。
 
-##### 示例
+#### 示例
 
 输入1:
 
@@ -661,21 +1067,21 @@ Total line number = 9
 It costs 0.014s
 ```
 
-##### 说明
+#### 说明
 
 - 当某个时间戳下左操作数和右操作数都不为空(`null`)时,表达式才会有结果,否则表达式值为`null`,且默认不出现在结果集中。
   - 从结果集1中可知,时间戳40下,时间序列`root.sg.a`的值为4,`root.sg.b`的值为`null`。所以在时间戳40下,表达式`(a + b) * 2 + sin(a)`的值为`null`,从结果集2可以看出,时间戳40没有出现。
 - 如果表达式中某个操作数对应多条时间序列(如通配符`*`),那么每条时间序列对应的结果都会出现在结果集中(按照笛卡尔积形式)。请参考示例中输入3、4和对应结果集3、4。
 
-##### 注意
+#### 注意
 
 > 目前对齐时间序列(Aligned Timeseries)尚不支持时间序列查询嵌套表达式。使用时间序列查询嵌套表达式时,如果操作数包含对齐时间序列,会提示错误。
 
-#### 聚合查询嵌套表达式
+### 聚合查询嵌套表达式
 
 IoTDB 支持在 `SELECT` 子句中计算由**数字常量,聚合查询和算数运算表达式**组成的任意嵌套表达式。
 
-##### 示例
+#### 示例
 
 不指定 `GROUP BY` 的聚合查询。
 
@@ -755,12 +1161,12 @@ Total line number = 8
 It costs 0.012s
 ```
 
-##### 说明
+#### 说明
 
 - 当某个时间戳下左操作数和右操作数都不为空(`null`)时,表达式才会有结果,否则表达式值为`null`,且默认不出现在结果集中。但在使用`GROUP BY`子句的聚合查询嵌套表达式中,我们希望保留每个时间窗口的值,所以表达式值为`null`的窗口也包含在结果集中。请参考输入3及结果集3。
 - 如果表达式中某个操作数对应多条时间序列(如通配符`*`),那么每条时间序列对应的结果都会出现在结果集中(按照笛卡尔积形式)。请参考输入2及结果集2。
 
-##### 注意
+#### 注意
 
 > 目前此功能尚不支持填充算子(`FILL`)和按层级聚合(`GROUP BY LEVEL`)查询, 在后续版本会支持。
 >
@@ -774,7 +1180,7 @@ It costs 0.012s
 > SELECT avg(s1) + avg(s2) FROM root.sg.d1 GROUP BY([0, 10000), 1s) FILL(previous); -- 空值填充 
 > ```
 
-### 使用别名
+## 使用别名
 
 由于 IoTDB 独特的数据模型,在每个传感器前都附带有设备等诸多额外信息。有时,我们只针对某个具体设备查询,而这些前缀信息频繁显示造成了冗余,影响了结果集的显示与分析。这时我们可以使用 IoTDB 提供的 AS 函数,将查询中出现的时间序列给定一个别名。
 
diff --git a/docs/zh/UserGuide/UDF-Library/M4.md b/docs/zh/UserGuide/UDF-Library/M4.md
deleted file mode 100644
index e5645e8503..0000000000
--- a/docs/zh/UserGuide/UDF-Library/M4.md
+++ /dev/null
@@ -1,93 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-    
-        http://www.apache.org/licenses/LICENSE-2.0
-    
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-
-## M4
-
-### M4
-
-#### 函数简介
-
-本函数使用 MAC (merging all chunks) 方法执行M4聚合查询。
-
-**函数名:** M4
-
-**输入序列:** 仅支持单个输入序列,类型为 INT32 / INT64 / FLOAT / DOUBLE。
-
-**参数:**
-
-+ `tqs`: 查询的开始时间(含)。
-+ `tqe`: 查询的结束时间(不含)。
-+ `w`: M4聚合中的时间跨度数量。
-
-**输出序列:** 每个时间跨度的首、尾、最小和最大值。
-
-`[tqs+(tqe-tqs)/w*(i-1),tqs+(tqe-tqs)/w*i), i=1,...,w.`
-
-**说明:**
-+ 函数当前仅适用于 `research/M4-visualization` 分支。
-+ 输入参数需确保 `(tqe-tqs)` 是 `w` 的倍数。
-+ 查询时需在 where 语句后添加 `time>=tqs and time<tqe`。
-
-#### 使用示例
-
-输入序列:
-
-```
-+-----------------------------+------------------+
-|                         Time|root.vehicle.d0.s0|
-+-----------------------------+------------------+
-|1970-01-01T08:00:00.001+08:00|               5.0|
-|1970-01-01T08:00:00.002+08:00|              15.0|
-|1970-01-01T08:00:00.005+08:00|              10.0|
-|1970-01-01T08:00:00.008+08:00|               8.0|
-|1970-01-01T08:00:00.010+08:00|              20.0|
-|1970-01-01T08:00:00.020+08:00|              20.0|
-|1970-01-01T08:00:00.025+08:00|               8.0|
-|1970-01-01T08:00:00.027+08:00|              20.0|
-|1970-01-01T08:00:00.030+08:00|              40.0|
-|1970-01-01T08:00:00.033+08:00|               9.0|
-|1970-01-01T08:00:00.035+08:00|              10.0|
-|1970-01-01T08:00:00.040+08:00|              20.0|
-|1970-01-01T08:00:00.045+08:00|              30.0|
-|1970-01-01T08:00:00.052+08:00|               8.0|
-|1970-01-01T08:00:00.054+08:00|              18.0|
-|1970-01-01T08:00:00.120+08:00|               8.0|
-+-----------------------------+------------------+
-```
-
-用于查询的 SQL 语句:
-
-```sql
-select M4(s0,'tqs'='0','tqe'='100','w'='4') from root.vehicle.d0 where time>=0 and time<100
-```
-
-输出序列:
-
-```
-+-----------------------------+----------------------------------------------------------------------------------+
-|                         Time|                           M4(root.vehicle.d0.s0, "tqs"="0", "tqe"="100", "w"="4")|
-+-----------------------------+----------------------------------------------------------------------------------+
-|1970-01-01T08:00:00.000+08:00|  FirstPoint=(1,5.0), LastPoint=(20,20.0), BottomPoint=(1,5.0), TopPoint=(10,20.0)|
-|1970-01-01T08:00:00.025+08:00|FirstPoint=(25,8.0), LastPoint=(45,30.0), BottomPoint=(25,8.0), TopPoint=(30,40.0)|
-|1970-01-01T08:00:00.050+08:00|FirstPoint=(52,8.0), LastPoint=(54,18.0), BottomPoint=(52,8.0), TopPoint=(54,18.0)|
-|1970-01-01T08:00:00.075+08:00|                                                                             empty|
-+-----------------------------+----------------------------------------------------------------------------------+
-```
\ No newline at end of file
diff --git a/example/client-cpp-example/pom.xml b/example/client-cpp-example/pom.xml
index f82dd708b5..1eb25fa8c1 100644
--- a/example/client-cpp-example/pom.xml
+++ b/example/client-cpp-example/pom.xml
@@ -84,7 +84,7 @@
             <properties>
                 <cmake.generator>Visual Studio 16 2019</cmake.generator>
                 <cmake.root.dir>${project.parent.basedir}/../compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
-                <boost.include.dir />
+                <boost.include.dir/>
             </properties>
         </profile>
         <profile>
diff --git a/example/trigger/pom.xml b/example/trigger/pom.xml
index 37eda173da..7c3fc28c8f 100644
--- a/example/trigger/pom.xml
+++ b/example/trigger/pom.xml
@@ -118,7 +118,7 @@
                                 <importOrder>
                                     <order>org.apache.iotdb,,javax,java,\#</order>
                                 </importOrder>
-                                <removeUnusedImports />
+                                <removeUnusedImports/>
                             </java>
                         </configuration>
                         <executions>
diff --git a/example/udf/pom.xml b/example/udf/pom.xml
index c54ab7e2ba..b462824fab 100644
--- a/example/udf/pom.xml
+++ b/example/udf/pom.xml
@@ -118,7 +118,7 @@
                                 <importOrder>
                                     <order>org.apache.iotdb,,javax,java,\#</order>
                                 </importOrder>
-                                <removeUnusedImports />
+                                <removeUnusedImports/>
                             </java>
                         </configuration>
                         <executions>
diff --git a/grafana-connector/pom.xml b/grafana-connector/pom.xml
index 4277c1075e..31ba31352b 100644
--- a/grafana-connector/pom.xml
+++ b/grafana-connector/pom.xml
@@ -170,7 +170,7 @@
                                     <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                         <resource>META-INF/spring.schemas</resource>
                                     </transformer>
-                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                     <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                         <mainClass>${start-class}</mainClass>
                                     </transformer>
diff --git a/grafana-plugin/pom.xml b/grafana-plugin/pom.xml
index 8fc6e79161..c8248fd22e 100644
--- a/grafana-plugin/pom.xml
+++ b/grafana-plugin/pom.xml
@@ -123,16 +123,16 @@
                                 <configuration>
                                     <tasks>
                                         <condition property="osFamily" value="windows">
-                                            <os family="windows" />
+                                            <os family="windows"/>
                                         </condition>
                                         <condition property="osFamily" value="unix">
-                                            <os family="unix" />
+                                            <os family="unix"/>
                                         </condition>
                                         <exec executable="C:\\Windows\\System32\\cmd.exe" osfamily="windows">
-                                            <arg line="/c backend-compile.bat" />
+                                            <arg line="/c backend-compile.bat"/>
                                         </exec>
                                         <exec executable="/bin/bash" osfamily="unix">
-                                            <arg line="-c ./backend-compile.sh" />
+                                            <arg line="-c ./backend-compile.sh"/>
                                         </exec>
                                     </tasks>
                                 </configuration>
diff --git a/integration/pom.xml b/integration/pom.xml
index a33f3d289b..46fdfd9575 100644
--- a/integration/pom.xml
+++ b/integration/pom.xml
@@ -80,7 +80,7 @@
             <id>LocalStandalone</id>
             <properties>
                 <test.includedGroups>org.apache.iotdb.itbase.category.LocalStandaloneTest</test.includedGroups>
-                <test.excludedGroups />
+                <test.excludedGroups/>
             </properties>
             <activation>
                 <activeByDefault>true</activeByDefault>
@@ -142,7 +142,7 @@
             <id>Remote</id>
             <properties>
                 <test.includedGroups>org.apache.iotdb.itbase.category.RemoteTest</test.includedGroups>
-                <test.excludedGroups />
+                <test.excludedGroups/>
             </properties>
             <activation>
                 <activeByDefault>false</activeByDefault>
@@ -206,7 +206,7 @@
             <id>Cluster</id>
             <properties>
                 <test.includedGroups>org.apache.iotdb.itbase.category.ClusterTest</test.includedGroups>
-                <test.excludedGroups />
+                <test.excludedGroups/>
             </properties>
             <activation>
                 <activeByDefault>false</activeByDefault>
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFBuiltinFunctionIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFBuiltinFunctionIT.java
index 974ce5b3e4..8f72a8f043 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFBuiltinFunctionIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFBuiltinFunctionIT.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.itbase.category.ClusterTest;
 import org.apache.iotdb.itbase.category.LocalStandaloneTest;
 
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -32,7 +33,13 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Locale;
 
+import static org.apache.iotdb.db.query.udf.builtin.UDTFM4.DISPLAY_WINDOW_BEGIN_KEY;
+import static org.apache.iotdb.db.query.udf.builtin.UDTFM4.DISPLAY_WINDOW_END_KEY;
+import static org.apache.iotdb.db.query.udf.builtin.UDTFM4.SLIDING_STEP_KEY;
+import static org.apache.iotdb.db.query.udf.builtin.UDTFM4.TIME_INTERVAL_KEY;
+import static org.apache.iotdb.db.query.udf.builtin.UDTFM4.WINDOW_SIZE_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -577,4 +584,514 @@ public class IoTDBUDTFBuiltinFunctionIT {
       assertTrue(e.getMessage().contains("Upper can not be smaller than lower."));
     }
   }
+
+  @Test
+  public void testEqualBucketSampleForRandom() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE TIMESERIES root.ebs.d5.s1 with datatype=INT32,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+    String[] SQL_FOR_SAMPLE_S1 = new String[100];
+    for (int i = 0; i < 100; i++) {
+      SQL_FOR_SAMPLE_S1[i] =
+          String.format("insert into root.ebs.d5(time, s1) values (%d, %d)", i, i);
+    }
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (int i = 0; i < 100; i++) {
+        statement.execute(SQL_FOR_SAMPLE_S1[i]);
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      String functionName = "EQUAL_SIZE_BUCKET_RANDOM_SAMPLE";
+      double proportionValue = 0.1;
+      ResultSet resultSet =
+          statement.executeQuery(
+              String.format(
+                  "select " + "%s(s1, 'proportion'='%f') from root.ebs.d5",
+                  functionName, proportionValue));
+      int columnCount = resultSet.getMetaData().getColumnCount();
+      assertEquals(1 + 1, columnCount);
+      int count = 0;
+      while (resultSet.next()) {
+        count++;
+      }
+      assertEquals(10, count);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testEqualBucketSampleForAgg() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE TIMESERIES root.ebs.d4.s1 with datatype=FLOAT,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.ebs.d4.s2 with datatype=DOUBLE,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.ebs.d4.s3 with datatype=INT64,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.ebs.d4.s4 with datatype=INT32,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.ebs.d4.s5 with datatype=INT32,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.ebs.d4.s6 with datatype=DOUBLE,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+    String[] SQL_FOR_SAMPLE_S1 = new String[100];
+    String[] SQL_FOR_SAMPLE_S2 = new String[100];
+    String[] SQL_FOR_SAMPLE_S3 = new String[100];
+    String[] SQL_FOR_SAMPLE_S4 = new String[100];
+    String[] SQL_FOR_SAMPLE_S5 = new String[100];
+    String[] SQL_FOR_SAMPLE_S6 = new String[100];
+
+    for (int i = 0; i < 100; i++) {
+      SQL_FOR_SAMPLE_S1[i] =
+          String.format("insert into root.ebs.d4(time, s1) values (%d, %f)", i, i * 1.0);
+      SQL_FOR_SAMPLE_S2[i] =
+          String.format("insert into root.ebs.d4(time, s2) values (%d, %f)", i, i * 1.0);
+      SQL_FOR_SAMPLE_S3[i] =
+          String.format("insert into root.ebs.d4(time, s3) values (%d, %d)", i, i);
+      SQL_FOR_SAMPLE_S4[i] =
+          String.format("insert into root.ebs.d4(time, s4) values (%d, %d)", i, i);
+      SQL_FOR_SAMPLE_S5[i] =
+          String.format("insert into root.ebs.d4(time, s5) values (%d, %d)", i, -i);
+      SQL_FOR_SAMPLE_S6[i] =
+          String.format("insert into root.ebs.d4(time, s6) values (%d, %f)", i, i * 1.0);
+    }
+    float[] ANSWER1 =
+        new float[] {4.5F, 14.5F, 24.5F, 34.5F, 44.5F, 54.5F, 64.5F, 74.5F, 84.5F, 94.5F};
+    double[] ANSWER2 = new double[] {0, 10, 20, 30, 40, 50, 60, 70, 80, 90};
+    long[] ANSWER3 = new long[] {9, 19, 29, 39, 49, 59, 69, 79, 89, 99};
+    long[] ANSWER4 = new long[] {45, 145, 245, 345, 445, 545, 645, 745, 845, 945};
+    int[] ANSWER5 = new int[] {-9, -19, -29, -39, -49, -59, -69, -79, -89, -99};
+    double[] ANSWER6 = new double[] {8.25, 8.25, 8.25, 8.25, 8.25, 8.25, 8.25, 8.25, 8.25, 8.25};
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (int i = 0; i < 100; i++) {
+        statement.execute(SQL_FOR_SAMPLE_S1[i]);
+        statement.execute(SQL_FOR_SAMPLE_S2[i]);
+        statement.execute(SQL_FOR_SAMPLE_S3[i]);
+        statement.execute(SQL_FOR_SAMPLE_S4[i]);
+        statement.execute(SQL_FOR_SAMPLE_S5[i]);
+        statement.execute(SQL_FOR_SAMPLE_S6[i]);
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      String functionName = "EQUAL_SIZE_BUCKET_AGG_SAMPLE";
+      double proportionValue = 0.1;
+      ResultSet resultSet =
+          statement.executeQuery(
+              String.format(
+                  "select "
+                      + "%s(s1, 'proportion'='%f'), "
+                      + "%s(s2, 'type'='%s', 'proportion'='%f'), "
+                      + "%s(s3, 'type'='%s', 'proportion'='%f'), "
+                      + "%s(s4, 'type'='%s', 'proportion'='%f'), "
+                      + "%s(s5, 'type'='%s', 'proportion'='%f'), "
+                      + "%s(s6, 'type'='%s', 'proportion'='%f')"
+                      + "from root.ebs.d4",
+                  functionName,
+                  proportionValue,
+                  functionName,
+                  "min",
+                  proportionValue,
+                  functionName,
+                  "max",
+                  proportionValue,
+                  functionName,
+                  "sum",
+                  proportionValue,
+                  functionName,
+                  "extreme",
+                  proportionValue,
+                  functionName,
+                  "variance",
+                  proportionValue));
+      int columnCount = resultSet.getMetaData().getColumnCount();
+      assertEquals(1 + 6, columnCount);
+      for (int i = 0; i < 10; i++) {
+        resultSet.next();
+        assertEquals(ANSWER1[i], resultSet.getDouble(2), 0.01);
+        assertEquals(ANSWER2[i], resultSet.getDouble(3), 0.01);
+        assertEquals(ANSWER3[i], resultSet.getLong(4));
+        assertEquals(ANSWER4[i], resultSet.getLong(5));
+        assertEquals(ANSWER5[i], resultSet.getInt(6));
+        assertEquals(ANSWER6[i], resultSet.getDouble(7), 0.01);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testEqualBucketSampleForM4() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE TIMESERIES root.ebs.d3.s1 with datatype=INT32,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+    String[] SQL_FOR_SAMPLE = new String[100];
+    for (int i = 0; i < 100; i++) {
+      SQL_FOR_SAMPLE[i] =
+          String.format("insert into root.ebs.d3(time, s1) values (%d, %d)", i, i + 1);
+    }
+    int[] ANSWER1 = new int[] {1, 2, 39, 40, 41, 42, 79, 80, 81, 82, 99, 100};
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (String dataGenerationSQL : SQL_FOR_SAMPLE) {
+        statement.execute(dataGenerationSQL);
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      String functionName = "EQUAL_SIZE_BUCKET_M4_SAMPLE";
+      String methodName = "m4";
+      double proportionValue = 0.1;
+      ResultSet resultSet =
+          statement.executeQuery(
+              String.format(
+                  "select %s(s1, 'method'='%s', 'proportion'='%f') from root.ebs.d3",
+                  functionName, methodName, proportionValue));
+      int columnCount = resultSet.getMetaData().getColumnCount();
+      assertEquals(1 + 1, columnCount);
+      for (int j : ANSWER1) {
+        resultSet.next();
+        assertEquals(j, resultSet.getInt(2));
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testEqualBucketSampleForOutlier() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE TIMESERIES root.ebs.d6.s1 with datatype=INT32,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.ebs.d6.s2 with datatype=INT64,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.ebs.d6.s3 with datatype=DOUBLE,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.ebs.d6.s4 with datatype=FLOAT,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.ebs.d6.s5 with datatype=FLOAT,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+    String[] SQL_FOR_SAMPLE_S1 = new String[100];
+    String[] SQL_FOR_SAMPLE_S2 = new String[100];
+    String[] SQL_FOR_SAMPLE_S3 = new String[100];
+    String[] SQL_FOR_SAMPLE_S4 = new String[100];
+    String[] SQL_FOR_SAMPLE_S5 = new String[20];
+    for (int i = 0; i < 100; i++) {
+      SQL_FOR_SAMPLE_S1[i] =
+          String.format(
+              "insert into root.ebs.d6(time, s1) values (%d, %d)",
+              i, i % 5 == 0 && i % 10 != 0 ? i + 100 : i);
+      SQL_FOR_SAMPLE_S2[i] =
+          String.format(
+              "insert into root.ebs.d6(time, s2) values (%d, %d)", i, i % 10 == 6 ? i + 100 : i);
+      if (i % 10 == 9 || i % 20 == 0) {
+        SQL_FOR_SAMPLE_S2[i] = String.format("insert into root.ebs.d6(time, s2) values (%d, 0)", i);
+      }
+      SQL_FOR_SAMPLE_S3[i] =
+          String.format(
+              "insert into root.ebs.d6(time, s3) values (%d, %d)", i, i % 10 == 7 ? i + 100 : i);
+      SQL_FOR_SAMPLE_S4[i] =
+          String.format(
+              "insert into root.ebs.d6(time, s4) values (%d, %d)", i, i % 10 == 8 ? i + 100 : i);
+    }
+    for (int i = 0; i < 20; i++) {
+      SQL_FOR_SAMPLE_S5[i] =
+          String.format("insert into root.ebs.d6(time, s5) values (%d, %d)", i, i);
+    }
+    int[] ANSWER1 = new int[] {105, 115, 125, 135, 145, 155, 165, 175, 185, 195};
+    long[] ANSWER2 = new long[] {106, 116, 126, 136, 146, 156, 166, 176, 186, 196};
+    double[] ANSWER3 = new double[] {107, 117, 127, 137, 147, 157, 167, 177, 187, 197};
+    float[] ANSWER4 = new float[] {108, 118, 128, 138, 148, 158, 168, 178, 188, 198};
+    float[] ANSWER5 = new float[] {0, 2, 4, 6, 8, 10, 12, 14, 16, 18};
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (int i = 0; i < 100; i++) {
+        statement.execute(SQL_FOR_SAMPLE_S1[i]);
+        statement.execute(SQL_FOR_SAMPLE_S2[i]);
+        statement.execute(SQL_FOR_SAMPLE_S3[i]);
+        statement.execute(SQL_FOR_SAMPLE_S4[i]);
+      }
+      for (int i = 0; i < 20; i++) {
+        statement.execute(SQL_FOR_SAMPLE_S5[i]);
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      String functionName = "EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE";
+      double proportionValue = 0.1;
+
+      ResultSet resultSet0 =
+          statement.executeQuery(
+              String.format(
+                  "select "
+                      + "%s(s1, 'proportion'='%f', 'type'='%s', 'number'='%d') "
+                      + "from root.ebs.d6",
+                  functionName, proportionValue, "avg", 2));
+      int columnCount0 = resultSet0.getMetaData().getColumnCount();
+      assertEquals(1 + 1, columnCount0);
+      for (int i = 0; i < 10; i++) {
+        resultSet0.next();
+        assertEquals(ANSWER1[i], resultSet0.getInt(2));
+      }
+
+      ResultSet resultSet1 =
+          statement.executeQuery(
+              String.format(
+                  "select "
+                      + "%s(s2, 'proportion'='%f', 'type'='%s', 'number'='%d') "
+                      + "from root.ebs.d6",
+                  functionName, proportionValue, "stendis", 2));
+      int columnCount1 = resultSet1.getMetaData().getColumnCount();
+      assertEquals(1 + 1, columnCount1);
+      for (int i = 0; i < 10; i++) {
+        resultSet1.next();
+        assertEquals(ANSWER2[i], resultSet1.getLong(2));
+      }
+
+      ResultSet resultSet2 =
+          statement.executeQuery(
+              String.format(
+                  "select "
+                      + "%s(s3, 'proportion'='%f', 'type'='%s', 'number'='%d') "
+                      + "from root.ebs.d6",
+                  functionName, proportionValue, "cos", 2));
+      int columnCount2 = resultSet2.getMetaData().getColumnCount();
+      assertEquals(1 + 1, columnCount2);
+      for (int i = 0; i < 10; i++) {
+        resultSet2.next();
+        assertEquals(ANSWER3[i], resultSet2.getDouble(2), 0.01);
+      }
+
+      ResultSet resultSet3 =
+          statement.executeQuery(
+              String.format(
+                  "select "
+                      + "%s(s4, 'proportion'='%f', 'type'='%s', 'number'='%d') "
+                      + "from root.ebs.d6",
+                  functionName, proportionValue, "prenextdis", 2));
+      int columnCount3 = resultSet3.getMetaData().getColumnCount();
+      assertEquals(1 + 1, columnCount3);
+      for (int i = 0; i < 10; i++) {
+        resultSet3.next();
+        assertEquals(ANSWER4[i], resultSet3.getFloat(2), 0.01);
+      }
+
+      ResultSet resultSet4 =
+          statement.executeQuery(
+              String.format(
+                  "select "
+                      + "%s(s5, 'proportion'='%f', 'type'='%s', 'number'='%d') "
+                      + "from root.ebs.d6",
+                  functionName, 0.5, "cos", 1));
+      int columnCount4 = resultSet4.getMetaData().getColumnCount();
+      assertEquals(1 + 1, columnCount4);
+      for (int i = 0; i < 10; i++) {
+        resultSet4.next();
+        assertEquals(ANSWER5[i], resultSet4.getFloat(2), 0.01);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testM4Function() {
+    // create timeseries
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.m4");
+      statement.execute("CREATE TIMESERIES root.m4.d1.s1 with datatype=double,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.m4.d1.s2 with datatype=INT32,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+
+    // insert data
+    String insertTemplate = "INSERT INTO root.m4.d1(timestamp,%s)" + " VALUES(%d,%d)";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      // "root.m4.d1.s1" data illustration:
+      // https://user-images.githubusercontent.com/33376433/151985070-73158010-8ba0-409d-a1c1-df69bad1aaee.png
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 1, 5));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 2, 15));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 20, 1));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 25, 8));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 54, 3));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 120, 8));
+      statement.execute("FLUSH");
+
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 5, 10));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 8, 8));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 10, 30));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 20, 20));
+      statement.execute("FLUSH");
+
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 27, 20));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 30, 40));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 35, 10));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 40, 20));
+      statement.execute("FLUSH");
+
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 33, 9));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 45, 30));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 52, 8));
+      statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s1", 54, 18));
+      statement.execute("FLUSH");
+
+      // "root.m4.d1.s2" data: constant value 1
+      for (int i = 0; i < 100; i++) {
+        statement.execute(String.format(Locale.ENGLISH, insertTemplate, "s2", i, 1));
+      }
+      statement.execute("FLUSH");
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    // query tests
+    test_M4_firstWindowEmpty();
+    test_M4_slidingTimeWindow();
+    test_M4_slidingSizeWindow();
+    test_M4_constantTimeSeries();
+  }
+
+  private void test_M4_firstWindowEmpty() {
+    String[] res = new String[] {"120,8.0"};
+
+    String sql =
+        String.format(
+            "select M4(s1, '%s'='%s','%s'='%s','%s'='%s','%s'='%s') from root.m4.d1",
+            TIME_INTERVAL_KEY,
+            25,
+            SLIDING_STEP_KEY,
+            25,
+            DISPLAY_WINDOW_BEGIN_KEY,
+            75,
+            DISPLAY_WINDOW_END_KEY,
+            150);
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sql);
+      int count = 0;
+      while (resultSet.next()) {
+        String str = resultSet.getString(1) + "," + resultSet.getString(2);
+        Assert.assertEquals(res[count], str);
+        count++;
+      }
+      Assert.assertEquals(res.length, count);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private void test_M4_slidingTimeWindow() {
+    String[] res =
+        new String[] {
+          "1,5.0", "10,30.0", "20,20.0", "25,8.0", "30,40.0", "45,30.0", "52,8.0", "54,18.0",
+          "120,8.0"
+        };
+
+    String sql =
+        String.format(
+            "select M4(s1, '%s'='%s','%s'='%s','%s'='%s','%s'='%s') from root.m4.d1",
+            TIME_INTERVAL_KEY,
+            25,
+            SLIDING_STEP_KEY,
+            25,
+            DISPLAY_WINDOW_BEGIN_KEY,
+            0,
+            DISPLAY_WINDOW_END_KEY,
+            150);
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sql);
+      int count = 0;
+      while (resultSet.next()) {
+        String str = resultSet.getString(1) + "," + resultSet.getString(2);
+        Assert.assertEquals(res[count], str);
+        count++;
+      }
+      Assert.assertEquals(res.length, count);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private void test_M4_slidingSizeWindow() {
+    String[] res = new String[] {"1,5.0", "30,40.0", "33,9.0", "35,10.0", "45,30.0", "120,8.0"};
+
+    String sql =
+        String.format(
+            "select M4(s1,'%s'='%s','%s'='%s') from root.m4.d1",
+            WINDOW_SIZE_KEY, 10, SLIDING_STEP_KEY, 10);
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sql);
+      int count = 0;
+      while (resultSet.next()) {
+        String str = resultSet.getString(1) + "," + resultSet.getString(2);
+        Assert.assertEquals(res[count], str);
+        count++;
+      }
+      Assert.assertEquals(res.length, count);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private void test_M4_constantTimeSeries() {
+    /* Result: 0,1 24,1 25,1 49,1 50,1 74,1 75,1 99,1 */
+    String sql =
+        String.format(
+            "select M4(s2, '%s'='%s','%s'='%s','%s'='%s','%s'='%s') from root.m4.d1",
+            TIME_INTERVAL_KEY,
+            25,
+            SLIDING_STEP_KEY,
+            25,
+            DISPLAY_WINDOW_BEGIN_KEY,
+            0,
+            DISPLAY_WINDOW_END_KEY,
+            100);
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sql);
+      int count = 0;
+      while (resultSet.next()) {
+        String expStr;
+        if (count % 2 == 0) {
+          expStr = 25 * (count / 2) + ",1";
+        } else {
+          expStr = 25 * (count / 2) + 24 + ",1";
+        }
+        String str = resultSet.getString(1) + "," + resultSet.getString(2);
+        Assert.assertEquals(expStr, str);
+        count++;
+      }
+      Assert.assertEquals(8, count);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
 }
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 621b156613..91bdf0c3b9 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -203,7 +203,7 @@
                                                 </goals>
                                             </pluginExecutionFilter>
                                             <action>
-                                                <ignore />
+                                                <ignore/>
                                             </action>
                                         </pluginExecution>
                                     </pluginExecutions>
diff --git a/pom.xml b/pom.xml
index 50ed4dc745..88637a4a5d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,7 +162,7 @@
         <sonar.junit.reportPaths>target/surefire-reports,target/failsafe-reports</sonar.junit.reportPaths>
         <!-- By default, the argLine is empty-->
         <gson.version>2.8.8</gson.version>
-        <argLine />
+        <argLine/>
         <!-- whether enable compiling the cpp client-->
         <client-cpp>false</client-cpp>
         <!-- disable enforcer by default-->
@@ -690,7 +690,7 @@
                             <importOrder>
                                 <order>org.apache.iotdb,,javax,java,\#</order>
                             </importOrder>
-                            <removeUnusedImports />
+                            <removeUnusedImports/>
                         </java>
                         <lineEndings>UNIX</lineEndings>
                     </configuration>
@@ -765,7 +765,7 @@
                         <phase>validate</phase>
                         <configuration>
                             <rules>
-                                <dependencyConvergence />
+                                <dependencyConvergence/>
                             </rules>
                         </configuration>
                         <goals>
@@ -811,7 +811,7 @@
                                 </requireJavaVersion>
                                 <!-- Disabled for now as it breaks the ability to build single modules -->
                                 <!--reactorModuleConvergence/-->
-                                <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies" />
+                                <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"/>
                             </rules>
                         </configuration>
                     </execution>
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
index d4111a54b6..e6c984f95a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
@@ -60,7 +60,13 @@ public enum BuiltinFunction {
   NON_ZERO_DURATION("NON_ZERO_DURATION", UDTFNonZeroDuration.class),
   ZERO_COUNT("ZERO_COUNT", UDTFZeroCount.class),
   NON_ZERO_COUNT("NON_ZERO_COUNT", UDTFNonZeroCount.class),
-  ;
+  EQUAL_SIZE_BUCKET_RANDOM_SAMPLE(
+      "EQUAL_SIZE_BUCKET_RANDOM_SAMPLE", UDTFEqualSizeBucketRandomSample.class),
+  EQUAL_SIZE_BUCKET_AGG_SAMPLE("EQUAL_SIZE_BUCKET_AGG_SAMPLE", UDTFEqualSizeBucketAggSample.class),
+  EQUAL_SIZE_BUCKET_M4_SAMPLE("EQUAL_SIZE_BUCKET_M4_SAMPLE", UDTFEqualSizeBucketM4Sample.class),
+  EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE(
+      "EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE", UDTFEqualSizeBucketOutlierSample.class),
+  M4("M4", UDTFM4.class);
 
   private final String functionName;
   private final Class<?> functionClass;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketAggSample.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketAggSample.java
new file mode 100644
index 0000000000..b44e4eb21c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketAggSample.java
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.builtin;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.exception.UDFException;
+import org.apache.iotdb.db.query.udf.api.exception.UDFInputSeriesDataTypeNotValidException;
+import org.apache.iotdb.db.query.udf.api.exception.UDFParameterNotValidException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+
+public class UDTFEqualSizeBucketAggSample extends UDTFEqualSizeBucketSample {
+
+  private String aggMethodType;
+  private Aggregator aggregator;
+
+  private interface Aggregator {
+
+    void aggregateInt(RowWindow rowWindow, PointCollector collector) throws IOException;
+
+    void aggregateLong(RowWindow rowWindow, PointCollector collector) throws IOException;
+
+    void aggregateFloat(RowWindow rowWindow, PointCollector collector) throws IOException;
+
+    void aggregateDouble(RowWindow rowWindow, PointCollector collector) throws IOException;
+  }
+
+  private static class AvgAggregator implements Aggregator {
+
+    @Override
+    public void aggregateInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        sum += rowWindow.getRow(i).getInt(0);
+      }
+      collector.putDouble(time, sum / windowSize);
+    }
+
+    @Override
+    public void aggregateLong(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        sum += rowWindow.getRow(i).getLong(0);
+      }
+      collector.putDouble(time, sum / windowSize);
+    }
+
+    @Override
+    public void aggregateFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        sum += rowWindow.getRow(i).getFloat(0);
+      }
+      collector.putDouble(time, sum / windowSize);
+    }
+
+    @Override
+    public void aggregateDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        sum += rowWindow.getRow(i).getDouble(0);
+      }
+      collector.putDouble(time, sum / windowSize);
+    }
+  }
+
+  private static class MinAggregator implements Aggregator {
+
+    @Override
+    public void aggregateInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      int minValue = rowWindow.getRow(0).getInt(0);
+      for (int i = 1; i < windowSize; i++) {
+        int value = rowWindow.getRow(i).getInt(0);
+        if (minValue > value) {
+          minValue = value;
+        }
+      }
+      collector.putInt(time, minValue);
+    }
+
+    @Override
+    public void aggregateLong(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      long minValue = rowWindow.getRow(0).getLong(0);
+      for (int i = 1; i < windowSize; i++) {
+        long value = rowWindow.getRow(i).getLong(0);
+        if (minValue > value) {
+          minValue = value;
+        }
+      }
+      collector.putLong(time, minValue);
+    }
+
+    @Override
+    public void aggregateFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      float minValue = rowWindow.getRow(0).getFloat(0);
+      for (int i = 1; i < windowSize; i++) {
+        float value = rowWindow.getRow(i).getFloat(0);
+        if (minValue > value) {
+          minValue = value;
+        }
+      }
+      collector.putFloat(time, minValue);
+    }
+
+    @Override
+    public void aggregateDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double minValue = rowWindow.getRow(0).getDouble(0);
+      for (int i = 1; i < windowSize; i++) {
+        double value = rowWindow.getRow(i).getDouble(0);
+        if (minValue > value) {
+          minValue = value;
+        }
+      }
+      collector.putDouble(time, minValue);
+    }
+  }
+
+  private static class MaxAggregator implements Aggregator {
+
+    @Override
+    public void aggregateInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      int maxValue = rowWindow.getRow(0).getInt(0);
+      for (int i = 1; i < windowSize; i++) {
+        int value = rowWindow.getRow(i).getInt(0);
+        if (maxValue < value) {
+          maxValue = value;
+        }
+      }
+      collector.putInt(time, maxValue);
+    }
+
+    @Override
+    public void aggregateLong(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      long maxValue = rowWindow.getRow(0).getLong(0);
+      for (int i = 1; i < windowSize; i++) {
+        long value = rowWindow.getRow(i).getLong(0);
+        if (maxValue < value) {
+          maxValue = value;
+        }
+      }
+      collector.putLong(time, maxValue);
+    }
+
+    @Override
+    public void aggregateFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      float maxValue = rowWindow.getRow(0).getFloat(0);
+      for (int i = 1; i < windowSize; i++) {
+        float value = rowWindow.getRow(i).getFloat(0);
+        if (maxValue < value) {
+          maxValue = value;
+        }
+      }
+      collector.putFloat(time, maxValue);
+    }
+
+    @Override
+    public void aggregateDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double maxValue = rowWindow.getRow(0).getDouble(0);
+      for (int i = 1; i < windowSize; i++) {
+        double value = rowWindow.getRow(i).getDouble(0);
+        if (maxValue < value) {
+          maxValue = value;
+        }
+      }
+      collector.putDouble(time, maxValue);
+    }
+  }
+
+  private static class SumAggregator implements Aggregator {
+
+    @Override
+    public void aggregateInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      int sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        sum += rowWindow.getRow(i).getInt(0);
+      }
+      collector.putInt(time, sum);
+    }
+
+    @Override
+    public void aggregateLong(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      long sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        sum += rowWindow.getRow(i).getLong(0);
+      }
+      collector.putLong(time, sum);
+    }
+
+    @Override
+    public void aggregateFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      float sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        sum += rowWindow.getRow(i).getFloat(0);
+      }
+      collector.putFloat(time, sum);
+    }
+
+    @Override
+    public void aggregateDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        sum += rowWindow.getRow(i).getDouble(0);
+      }
+      collector.putDouble(time, sum);
+    }
+  }
+
+  private static class ExtremeAggregator implements Aggregator {
+
+    @Override
+    public void aggregateInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      int extreme = 0;
+      for (int i = 0; i < windowSize; i++) {
+        int origin = rowWindow.getRow(i).getInt(0);
+        int value = origin > 0 ? origin : -origin;
+        if (extreme < value) {
+          extreme = origin;
+        }
+      }
+      collector.putInt(time, extreme);
+    }
+
+    @Override
+    public void aggregateLong(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      long extreme = 0;
+      for (int i = 0; i < windowSize; i++) {
+        long origin = rowWindow.getRow(i).getLong(0);
+        long value = origin > 0 ? origin : -origin;
+        if (extreme < value) {
+          extreme = origin;
+        }
+      }
+      collector.putLong(time, extreme);
+    }
+
+    @Override
+    public void aggregateFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      float extreme = 0;
+      for (int i = 0; i < windowSize; i++) {
+        float origin = rowWindow.getRow(i).getFloat(0);
+        float value = origin > 0 ? origin : -origin;
+        if (extreme < value) {
+          extreme = origin;
+        }
+      }
+      collector.putFloat(time, extreme);
+    }
+
+    @Override
+    public void aggregateDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double extreme = 0;
+      for (int i = 0; i < windowSize; i++) {
+        double origin = rowWindow.getRow(i).getDouble(0);
+        double value = origin > 0 ? origin : -origin;
+        if (extreme < value) {
+          extreme = origin;
+        }
+      }
+      collector.putDouble(time, extreme);
+    }
+  }
+
+  private static class VarianceAggregator implements Aggregator {
+
+    @Override
+    public void aggregateInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double avg = 0, sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        avg += rowWindow.getRow(i).getInt(0);
+      }
+      avg /= windowSize;
+      for (int i = 0; i < windowSize; i++) {
+        double delta = rowWindow.getRow(i).getInt(0) - avg;
+        sum += delta * delta;
+      }
+      collector.putDouble(time, sum / windowSize);
+    }
+
+    @Override
+    public void aggregateLong(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double avg = 0, sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        avg += rowWindow.getRow(i).getLong(0);
+      }
+      avg /= windowSize;
+      for (int i = 0; i < windowSize; i++) {
+        double delta = rowWindow.getRow(i).getLong(0) - avg;
+        sum += delta * delta;
+      }
+      collector.putDouble(time, sum / windowSize);
+    }
+
+    @Override
+    public void aggregateFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double avg = 0, sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        avg += rowWindow.getRow(i).getFloat(0);
+      }
+      avg /= windowSize;
+      for (int i = 0; i < windowSize; i++) {
+        double delta = rowWindow.getRow(i).getFloat(0) - avg;
+        sum += delta * delta;
+      }
+      collector.putDouble(time, sum / windowSize);
+    }
+
+    @Override
+    public void aggregateDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double avg = 0, sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        avg += rowWindow.getRow(i).getDouble(0);
+      }
+      avg /= windowSize;
+      for (int i = 0; i < windowSize; i++) {
+        double delta = rowWindow.getRow(i).getDouble(0) - avg;
+        sum += delta * delta;
+      }
+      collector.putDouble(time, sum / windowSize);
+    }
+  }
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws MetadataException, UDFException {
+    super.validate(validator);
+    aggMethodType = validator.getParameters().getStringOrDefault("type", "avg").toLowerCase();
+    validator.validate(
+        type ->
+            "avg".equals(type)
+                || "max".equals(type)
+                || "min".equals(type)
+                || "sum".equals(type)
+                || "extreme".equals(type)
+                || "variance".equals(type),
+        "Illegal aggregation method. Aggregation type should be avg, min, max, sum, extreme, variance.",
+        aggMethodType);
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws UDFParameterNotValidException {
+    // if we use aggregation method on average or variance, the outputDataType may be double.
+    // For other scenarios, outputDataType == dataType
+    TSDataType outputDataType = dataType;
+    if ("avg".equals(aggMethodType) || "variance".equals(aggMethodType)) {
+      outputDataType = TSDataType.DOUBLE;
+    }
+    configurations
+        .setAccessStrategy(new SlidingSizeWindowAccessStrategy(bucketSize))
+        .setOutputDataType(outputDataType);
+    switch (aggMethodType) {
+      case "avg":
+        aggregator = new AvgAggregator();
+        break;
+      case "min":
+        aggregator = new MinAggregator();
+        break;
+      case "max":
+        aggregator = new MaxAggregator();
+        break;
+      case "sum":
+        aggregator = new SumAggregator();
+        break;
+      case "extreme":
+        aggregator = new ExtremeAggregator();
+        break;
+      case "variance":
+        aggregator = new VarianceAggregator();
+        break;
+      default:
+        throw new UDFParameterNotValidException(
+            "Illegal aggregation method. Aggregation type should be avg, min, max, sum, extreme, variance.");
+    }
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector)
+      throws IOException, UDFParameterNotValidException {
+    switch (dataType) {
+      case INT32:
+        aggregator.aggregateInt(rowWindow, collector);
+        break;
+      case INT64:
+        aggregator.aggregateLong(rowWindow, collector);
+        break;
+      case FLOAT:
+        aggregator.aggregateFloat(rowWindow, collector);
+        break;
+      case DOUBLE:
+        aggregator.aggregateDouble(rowWindow, collector);
+        break;
+      default:
+        // This will not happen
+        throw new UDFInputSeriesDataTypeNotValidException(
+            0, dataType, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketM4Sample.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketM4Sample.java
new file mode 100644
index 0000000000..f50a384f5a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketM4Sample.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.udf.builtin;
+
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.exception.UDFException;
+import org.apache.iotdb.db.query.udf.api.exception.UDFInputSeriesDataTypeNotValidException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+
+public class UDTFEqualSizeBucketM4Sample extends UDTFEqualSizeBucketSample {
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) {
+    bucketSize *= 4;
+    configurations
+        .setAccessStrategy(new SlidingSizeWindowAccessStrategy(bucketSize))
+        .setOutputDataType(dataType);
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector)
+      throws UDFException, IOException {
+    switch (dataType) {
+      case INT32:
+        transformInt(rowWindow, collector);
+        break;
+      case INT64:
+        transformLong(rowWindow, collector);
+        break;
+      case FLOAT:
+        transformFloat(rowWindow, collector);
+        break;
+      case DOUBLE:
+        transformDouble(rowWindow, collector);
+        break;
+      default:
+        // This will not happen
+        throw new UDFInputSeriesDataTypeNotValidException(
+            0, dataType, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE);
+    }
+  }
+
+  public void transformInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+    if (rowWindow.windowSize() <= 4) {
+      for (int i = 0; i < rowWindow.windowSize(); i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putInt(row.getTime(), row.getInt(0));
+      }
+      return;
+    }
+
+    int minIndex = 1, maxIndex = 1;
+    int maxValue = rowWindow.getRow(1).getInt(0);
+    int minValue = rowWindow.getRow(1).getInt(0);
+    for (int i = 2; i < rowWindow.windowSize() - 1; i++) {
+      int value = rowWindow.getRow(i).getInt(0);
+      if (minValue > value) {
+        minValue = value;
+        minIndex = i;
+      }
+      if (maxValue < value) {
+        maxValue = value;
+        maxIndex = i;
+      }
+    }
+    if (minIndex == maxIndex) {
+      maxIndex = rowWindow.windowSize() - 2;
+    }
+
+    Row row = rowWindow.getRow(0);
+    collector.putInt(row.getTime(), row.getInt(0));
+    if (maxIndex < minIndex) {
+      row = rowWindow.getRow(maxIndex);
+      collector.putInt(row.getTime(), row.getInt(0));
+      row = rowWindow.getRow(minIndex);
+    } else {
+      row = rowWindow.getRow(minIndex);
+      collector.putInt(row.getTime(), row.getInt(0));
+      row = rowWindow.getRow(maxIndex);
+    }
+    collector.putInt(row.getTime(), row.getInt(0));
+    row = rowWindow.getRow(rowWindow.windowSize() - 1);
+    collector.putInt(row.getTime(), row.getInt(0));
+  }
+
+  public void transformLong(RowWindow rowWindow, PointCollector collector) throws IOException {
+    if (rowWindow.windowSize() <= 4) {
+      for (int i = 0; i < rowWindow.windowSize(); i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putLong(row.getTime(), row.getLong(0));
+      }
+      return;
+    }
+
+    int minIndex = 1, maxIndex = 1;
+    long maxValue = rowWindow.getRow(1).getLong(0);
+    long minValue = rowWindow.getRow(1).getLong(0);
+    for (int i = 2; i < rowWindow.windowSize() - 1; i++) {
+      long value = rowWindow.getRow(i).getLong(0);
+      if (minValue > value) {
+        minValue = value;
+        minIndex = i;
+      }
+      if (maxValue < value) {
+        maxValue = value;
+        maxIndex = i;
+      }
+    }
+    if (minIndex == maxIndex) {
+      maxIndex = rowWindow.windowSize() - 2;
+    }
+
+    Row row = rowWindow.getRow(0);
+    collector.putLong(row.getTime(), row.getLong(0));
+    if (maxIndex < minIndex) {
+      row = rowWindow.getRow(maxIndex);
+      collector.putLong(row.getTime(), row.getLong(0));
+      row = rowWindow.getRow(minIndex);
+    } else {
+      row = rowWindow.getRow(minIndex);
+      collector.putLong(row.getTime(), row.getLong(0));
+      row = rowWindow.getRow(maxIndex);
+    }
+    collector.putLong(row.getTime(), row.getLong(0));
+    row = rowWindow.getRow(rowWindow.windowSize() - 1);
+    collector.putLong(row.getTime(), row.getLong(0));
+  }
+
+  public void transformFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
+    if (rowWindow.windowSize() <= 4) {
+      for (int i = 0; i < rowWindow.windowSize(); i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putFloat(row.getTime(), row.getFloat(0));
+      }
+      return;
+    }
+
+    int minIndex = 1, maxIndex = 1;
+    float maxValue = rowWindow.getRow(1).getFloat(0);
+    float minValue = rowWindow.getRow(1).getFloat(0);
+    for (int i = 2; i < rowWindow.windowSize() - 1; i++) {
+      float value = rowWindow.getRow(i).getFloat(0);
+      if (minValue > value) {
+        minValue = value;
+        minIndex = i;
+      }
+      if (maxValue < value) {
+        maxValue = value;
+        maxIndex = i;
+      }
+    }
+    if (minIndex == maxIndex) {
+      maxIndex = rowWindow.windowSize() - 2;
+    }
+
+    Row row = rowWindow.getRow(0);
+    collector.putFloat(row.getTime(), row.getFloat(0));
+    if (maxIndex < minIndex) {
+      row = rowWindow.getRow(maxIndex);
+      collector.putFloat(row.getTime(), row.getFloat(0));
+      row = rowWindow.getRow(minIndex);
+    } else {
+      row = rowWindow.getRow(minIndex);
+      collector.putFloat(row.getTime(), row.getFloat(0));
+      row = rowWindow.getRow(maxIndex);
+    }
+    collector.putFloat(row.getTime(), row.getFloat(0));
+    row = rowWindow.getRow(rowWindow.windowSize() - 1);
+    collector.putFloat(row.getTime(), row.getFloat(0));
+  }
+
+  public void transformDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
+    if (rowWindow.windowSize() <= 4) {
+      for (int i = 0; i < rowWindow.windowSize(); i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putDouble(row.getTime(), row.getDouble(0));
+      }
+      return;
+    }
+
+    int minIndex = 1, maxIndex = 1;
+    double maxValue = rowWindow.getRow(1).getDouble(0);
+    double minValue = rowWindow.getRow(1).getDouble(0);
+    for (int i = 2; i < rowWindow.windowSize() - 1; i++) {
+      double value = rowWindow.getRow(i).getDouble(0);
+      if (minValue > value) {
+        minValue = value;
+        minIndex = i;
+      }
+      if (maxValue < value) {
+        maxValue = value;
+        maxIndex = i;
+      }
+    }
+    if (minIndex == maxIndex) {
+      maxIndex = rowWindow.windowSize() - 2;
+    }
+
+    Row row = rowWindow.getRow(0);
+    collector.putDouble(row.getTime(), row.getDouble(0));
+    if (maxIndex < minIndex) {
+      row = rowWindow.getRow(maxIndex);
+      collector.putDouble(row.getTime(), row.getDouble(0));
+      row = rowWindow.getRow(minIndex);
+    } else {
+      row = rowWindow.getRow(minIndex);
+      collector.putDouble(row.getTime(), row.getDouble(0));
+      row = rowWindow.getRow(maxIndex);
+    }
+    collector.putDouble(row.getTime(), row.getDouble(0));
+    row = rowWindow.getRow(rowWindow.windowSize() - 1);
+    collector.putDouble(row.getTime(), row.getDouble(0));
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketOutlierSample.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketOutlierSample.java
new file mode 100644
index 0000000000..170e29a906
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketOutlierSample.java
@@ -0,0 +1,834 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.builtin;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.exception.UDFException;
+import org.apache.iotdb.db.query.udf.api.exception.UDFInputSeriesDataTypeNotValidException;
+import org.apache.iotdb.db.query.udf.api.exception.UDFParameterNotValidException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+public class UDTFEqualSizeBucketOutlierSample extends UDTFEqualSizeBucketSample {
+
+  private String type;
+  private int number;
+  private OutlierSampler outlierSampler;
+
+  private interface OutlierSampler {
+
+    void outlierSampleInt(RowWindow rowWindow, PointCollector collector) throws IOException;
+
+    void outlierSampleLong(RowWindow rowWindow, PointCollector collector) throws IOException;
+
+    void outlierSampleFloat(RowWindow rowWindow, PointCollector collector) throws IOException;
+
+    void outlierSampleDouble(RowWindow rowWindow, PointCollector collector) throws IOException;
+  }
+
+  private class AvgOutlierSampler implements OutlierSampler {
+
+    @Override
+    public void outlierSampleInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (windowSize <= number) {
+        for (int i = 0; i < windowSize; i++) {
+          Row row = rowWindow.getRow(i);
+          collector.putInt(row.getTime(), row.getInt(0));
+        }
+        return;
+      }
+
+      double avg = 0;
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+      for (int i = 0; i < windowSize; i++) {
+        avg += rowWindow.getRow(i).getInt(0);
+      }
+      avg /= windowSize;
+      for (int i = 0; i < windowSize; i++) {
+        double value = Math.abs(rowWindow.getRow(i).getInt(0) - avg);
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueInt(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleLong(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (windowSize <= number) {
+        for (int i = 0; i < windowSize; i++) {
+          Row row = rowWindow.getRow(i);
+          collector.putLong(row.getTime(), row.getLong(0));
+        }
+        return;
+      }
+
+      double avg = 0;
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+      for (int i = 0; i < windowSize; i++) {
+        avg += rowWindow.getRow(i).getLong(0);
+      }
+      avg /= windowSize;
+      for (int i = 0; i < windowSize; i++) {
+        double value = Math.abs(rowWindow.getRow(i).getLong(0) - avg);
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueLong(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleFloat(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (windowSize <= number) {
+        for (int i = 0; i < windowSize; i++) {
+          Row row = rowWindow.getRow(i);
+          collector.putFloat(row.getTime(), row.getFloat(0));
+        }
+        return;
+      }
+
+      double avg = 0;
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+      for (int i = 0; i < windowSize; i++) {
+        avg += rowWindow.getRow(i).getFloat(0);
+      }
+      avg /= windowSize;
+      for (int i = 0; i < windowSize; i++) {
+        double value = Math.abs(rowWindow.getRow(i).getFloat(0) - avg);
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueFloat(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleDouble(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (windowSize <= number) {
+        for (int i = 0; i < windowSize; i++) {
+          Row row = rowWindow.getRow(i);
+          collector.putDouble(row.getTime(), row.getDouble(0));
+        }
+        return;
+      }
+
+      double avg = 0;
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+      for (int i = 0; i < windowSize; i++) {
+        avg += rowWindow.getRow(i).getDouble(0);
+      }
+      avg /= windowSize;
+      for (int i = 0; i < windowSize; i++) {
+        double value = Math.abs(rowWindow.getRow(i).getDouble(0) - avg);
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueDouble(pq, rowWindow, collector);
+    }
+  }
+
+  private class StendisOutlierSampler implements OutlierSampler {
+
+    @Override
+    public void outlierSampleInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallInt(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      long row0x = rowWindow.getRow(0).getTime(),
+          row1x = rowWindow.getRow(windowSize - 1).getTime();
+      int row0y = rowWindow.getRow(0).getInt(0), row1y = rowWindow.getRow(windowSize - 1).getInt(0);
+
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+
+      double A = (double) row0y - row1y;
+      double B = (double) row1x - row0x;
+      double C = (double) row0x * row1y - row1x * row0y;
+      double denominator = Math.sqrt(A * A + B * B);
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        double value = Math.abs(A * row.getTime() + B * row.getInt(0) + C) / denominator;
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueInt(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleLong(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallLong(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      long row0x = rowWindow.getRow(0).getTime(),
+          row1x = rowWindow.getRow(windowSize - 1).getTime();
+      long row0y = rowWindow.getRow(0).getLong(0),
+          row1y = rowWindow.getRow(windowSize - 1).getLong(0);
+
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+
+      double A = (double) row0y - row1y;
+      double B = (double) row1x - row0x;
+      double C = (double) row0x * row1y - row1x * row0y;
+      double denominator = Math.sqrt(A * A + B * B);
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        double value = Math.abs(A * row.getTime() + B * row.getLong(0) + C) / denominator;
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueLong(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleFloat(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallFloat(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      long row0x = rowWindow.getRow(0).getTime(),
+          row1x = rowWindow.getRow(windowSize - 1).getTime();
+      float row0y = rowWindow.getRow(0).getFloat(0),
+          row1y = rowWindow.getRow(windowSize - 1).getFloat(0);
+
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+
+      double A = (double) row0y - row1y;
+      double B = (double) row1x - row0x;
+      double C = (double) row0x * row1y - row1x * row0y;
+      double denominator = Math.sqrt(A * A + B * B);
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        double value = Math.abs(A * row.getTime() + B * row.getFloat(0) + C) / denominator;
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueFloat(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleDouble(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallDouble(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      long row0x = rowWindow.getRow(0).getTime(),
+          row1x = rowWindow.getRow(windowSize - 1).getTime();
+      double row0y = rowWindow.getRow(0).getDouble(0),
+          row1y = rowWindow.getRow(windowSize - 1).getDouble(0);
+
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+
+      double A = row0y - row1y;
+      double B = (double) row1x - row0x;
+      double C = row0x * row1y - row1x * row0y;
+      double denominator = Math.sqrt(A * A + B * B);
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        double value = Math.abs(A * row.getTime() + B * row.getDouble(0) + C) / denominator;
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueDouble(pq, rowWindow, collector);
+    }
+  }
+
+  private class CosOutlierSampler implements OutlierSampler {
+
+    @Override
+    public void outlierSampleInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallInt(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      // o -> -o.right, max heap
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> -o.right));
+
+      long lastTime, currentTime, nextTime, x1, x2;
+      int lastValue, currentValue, nextValue, y1, y2;
+      double value;
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        lastTime = rowWindow.getRow(i - 1).getTime();
+        currentTime = rowWindow.getRow(i).getTime();
+        nextTime = rowWindow.getRow(i + 1).getTime();
+
+        lastValue = rowWindow.getRow(i - 1).getInt(0);
+        currentValue = rowWindow.getRow(i).getInt(0);
+        nextValue = rowWindow.getRow(i + 1).getInt(0);
+
+        x1 = currentTime - lastTime;
+        x2 = nextTime - currentTime;
+        y1 = currentValue - lastValue;
+        y2 = nextValue - currentValue;
+
+        value =
+            (x1 * x2 + y1 * y2)
+                / (Math.sqrt((double) x1 * x1 + y1 * y1) * Math.sqrt((double) x2 * x2 + y2 * y2));
+
+        addToMaxHeap(pq, i, value);
+      }
+
+      putPQValueInt(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleLong(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallLong(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      // o -> -o.right, max heap
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> -o.right));
+
+      long lastTime, currentTime, nextTime, x1, x2;
+      long lastValue, currentValue, nextValue, y1, y2;
+      double value;
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        lastTime = rowWindow.getRow(i - 1).getTime();
+        currentTime = rowWindow.getRow(i).getTime();
+        nextTime = rowWindow.getRow(i + 1).getTime();
+
+        lastValue = rowWindow.getRow(i - 1).getLong(0);
+        currentValue = rowWindow.getRow(i).getLong(0);
+        nextValue = rowWindow.getRow(i + 1).getLong(0);
+
+        x1 = currentTime - lastTime;
+        x2 = nextTime - currentTime;
+        y1 = currentValue - lastValue;
+        y2 = nextValue - currentValue;
+
+        value =
+            (x1 * x2 + y1 * y2)
+                / (Math.sqrt((double) x1 * x1 + y1 * y1) * Math.sqrt((double) x2 * x2 + y2 * y2));
+
+        addToMaxHeap(pq, i, value);
+      }
+
+      putPQValueLong(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleFloat(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallFloat(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      // o -> -o.right, max heap
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> -o.right));
+
+      long lastTime, currentTime, nextTime, x1, x2;
+      float lastValue, currentValue, nextValue, y1, y2;
+      double value;
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        lastTime = rowWindow.getRow(i - 1).getTime();
+        currentTime = rowWindow.getRow(i).getTime();
+        nextTime = rowWindow.getRow(i + 1).getTime();
+
+        lastValue = rowWindow.getRow(i - 1).getFloat(0);
+        currentValue = rowWindow.getRow(i).getFloat(0);
+        nextValue = rowWindow.getRow(i + 1).getFloat(0);
+
+        x1 = currentTime - lastTime;
+        x2 = nextTime - currentTime;
+        y1 = currentValue - lastValue;
+        y2 = nextValue - currentValue;
+
+        value = (x1 * x2 + y1 * y2) / (Math.sqrt(x1 * x1 + y1 * y1) * Math.sqrt(x2 * x2 + y2 * y2));
+
+        addToMaxHeap(pq, i, value);
+      }
+
+      putPQValueFloat(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleDouble(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallDouble(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      // o -> -o.right, max heap
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> -o.right));
+
+      long lastTime, currentTime, nextTime, x1, x2;
+      double lastValue, currentValue, nextValue, y1, y2;
+      double value;
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        lastTime = rowWindow.getRow(i - 1).getTime();
+        currentTime = rowWindow.getRow(i).getTime();
+        nextTime = rowWindow.getRow(i + 1).getTime();
+
+        lastValue = rowWindow.getRow(i - 1).getDouble(0);
+        currentValue = rowWindow.getRow(i).getDouble(0);
+        nextValue = rowWindow.getRow(i + 1).getDouble(0);
+
+        x1 = currentTime - lastTime;
+        x2 = nextTime - currentTime;
+        y1 = currentValue - lastValue;
+        y2 = nextValue - currentValue;
+
+        value = (x1 * x2 + y1 * y2) / (Math.sqrt(x1 * x1 + y1 * y1) * Math.sqrt(x2 * x2 + y2 * y2));
+
+        addToMaxHeap(pq, i, value);
+      }
+
+      putPQValueDouble(pq, rowWindow, collector);
+    }
+  }
+
+  private class PrenextdisOutlierSampler implements OutlierSampler {
+
+    @Override
+    public void outlierSampleInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallInt(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+
+      long lastTime, currentTime, nextTime, x1, x2;
+      int lastValue, currentValue, nextValue, y1, y2;
+      double value;
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        lastTime = rowWindow.getRow(i - 1).getTime();
+        currentTime = rowWindow.getRow(i).getTime();
+        nextTime = rowWindow.getRow(i + 1).getTime();
+
+        lastValue = rowWindow.getRow(i - 1).getInt(0);
+        currentValue = rowWindow.getRow(i).getInt(0);
+        nextValue = rowWindow.getRow(i + 1).getInt(0);
+
+        x1 = Math.abs(currentTime - lastTime);
+        x2 = Math.abs(nextTime - currentTime);
+        y1 = Math.abs(currentValue - lastValue);
+        y2 = Math.abs(nextValue - currentValue);
+
+        value = (double) x1 + y1 + x2 + y2;
+
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueInt(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleLong(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallLong(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+
+      long lastTime, currentTime, nextTime, x1, x2;
+      long lastValue, currentValue, nextValue, y1, y2;
+      double value;
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        lastTime = rowWindow.getRow(i - 1).getTime();
+        currentTime = rowWindow.getRow(i).getTime();
+        nextTime = rowWindow.getRow(i + 1).getTime();
+
+        lastValue = rowWindow.getRow(i - 1).getLong(0);
+        currentValue = rowWindow.getRow(i).getLong(0);
+        nextValue = rowWindow.getRow(i + 1).getLong(0);
+
+        x1 = Math.abs(currentTime - lastTime);
+        x2 = Math.abs(nextTime - currentTime);
+        y1 = Math.abs(currentValue - lastValue);
+        y2 = Math.abs(nextValue - currentValue);
+
+        value = (double) x1 + y1 + x2 + y2;
+
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueLong(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleFloat(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallFloat(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+
+      long lastTime, currentTime, nextTime, x1, x2;
+      float lastValue, currentValue, nextValue, y1, y2;
+      double value;
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        lastTime = rowWindow.getRow(i - 1).getTime();
+        currentTime = rowWindow.getRow(i).getTime();
+        nextTime = rowWindow.getRow(i + 1).getTime();
+
+        lastValue = rowWindow.getRow(i - 1).getFloat(0);
+        currentValue = rowWindow.getRow(i).getFloat(0);
+        nextValue = rowWindow.getRow(i + 1).getFloat(0);
+
+        x1 = Math.abs(currentTime - lastTime);
+        x2 = Math.abs(nextTime - currentTime);
+        y1 = Math.abs(currentValue - lastValue);
+        y2 = Math.abs(nextValue - currentValue);
+
+        value = x1 + y1 + x2 + y2;
+
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueFloat(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleDouble(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallDouble(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+
+      long lastTime, currentTime, nextTime, x1, x2;
+      double lastValue, currentValue, nextValue, y1, y2;
+      double value;
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        lastTime = rowWindow.getRow(i - 1).getTime();
+        currentTime = rowWindow.getRow(i).getTime();
+        nextTime = rowWindow.getRow(i + 1).getTime();
+
+        lastValue = rowWindow.getRow(i - 1).getDouble(0);
+        currentValue = rowWindow.getRow(i).getDouble(0);
+        nextValue = rowWindow.getRow(i + 1).getDouble(0);
+
+        x1 = Math.abs(currentTime - lastTime);
+        x2 = Math.abs(nextTime - currentTime);
+        y1 = Math.abs(currentValue - lastValue);
+        y2 = Math.abs(nextValue - currentValue);
+
+        value = x1 + y1 + x2 + y2;
+
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueDouble(pq, rowWindow, collector);
+    }
+  }
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws UDFException, MetadataException {
+    super.validate(validator);
+    type = validator.getParameters().getStringOrDefault("type", "avg").toLowerCase();
+    number = validator.getParameters().getIntOrDefault("number", 3);
+    validator
+        .validate(
+            type ->
+                "avg".equals(type)
+                    || "stendis".equals(type)
+                    || "cos".equals(type)
+                    || "prenextdis".equals(type),
+            "Illegal outlier method. Outlier type should be avg, stendis, cos or prenextdis.",
+            type)
+        .validate(
+            number -> (int) number > 0, "Illegal number. Number should be greater than 0.", number);
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    bucketSize *= number;
+    configurations
+        .setAccessStrategy(new SlidingSizeWindowAccessStrategy(bucketSize))
+        .setOutputDataType(dataType);
+    switch (type) {
+      case "avg":
+        outlierSampler = new AvgOutlierSampler();
+        break;
+      case "stendis":
+        outlierSampler = new StendisOutlierSampler();
+        break;
+      case "cos":
+        outlierSampler = new CosOutlierSampler();
+        break;
+      case "prenextdis":
+        outlierSampler = new PrenextdisOutlierSampler();
+        break;
+      default:
+        throw new UDFParameterNotValidException(
+            "Illegal outlier method. Outlier type should be avg, stendis, cos or prenextdis.");
+    }
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector)
+      throws IOException, UDFParameterNotValidException {
+    switch (dataType) {
+      case INT32:
+        outlierSampler.outlierSampleInt(rowWindow, collector);
+        break;
+      case INT64:
+        outlierSampler.outlierSampleLong(rowWindow, collector);
+        break;
+      case FLOAT:
+        outlierSampler.outlierSampleFloat(rowWindow, collector);
+        break;
+      case DOUBLE:
+        outlierSampler.outlierSampleDouble(rowWindow, collector);
+        break;
+      default:
+        // This will not happen
+        throw new UDFInputSeriesDataTypeNotValidException(
+            0, dataType, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE);
+    }
+  }
+
+  public void addToMinHeap(PriorityQueue<Pair<Integer, Double>> pq, int i, double value) {
+    if (pq.size() < number) {
+      pq.add(new Pair<>(i, value));
+    } else if (value > pq.peek().right) {
+      pq.poll();
+      pq.add(new Pair<>(i, value));
+    }
+  }
+
+  public void addToMaxHeap(PriorityQueue<Pair<Integer, Double>> pq, int i, double value) {
+    if (pq.size() < number) {
+      pq.add(new Pair<>(i, value));
+    } else if (value < pq.peek().right) {
+      pq.poll();
+      pq.add(new Pair<>(i, value));
+    }
+  }
+
+  public void putPQValueInt(
+      PriorityQueue<Pair<Integer, Double>> pq, RowWindow rowWindow, PointCollector collector)
+      throws IOException {
+    int[] arr = new int[number];
+    for (int i = 0; i < number; i++) {
+      arr[i] = pq.peek().left;
+      pq.poll();
+    }
+    Arrays.sort(arr);
+    for (int i = 0; i < number; i++) {
+      collector.putInt(rowWindow.getRow(arr[i]).getTime(), rowWindow.getRow(arr[i]).getInt(0));
+    }
+  }
+
+  public void putPQValueLong(
+      PriorityQueue<Pair<Integer, Double>> pq, RowWindow rowWindow, PointCollector collector)
+      throws IOException {
+    int[] arr = new int[number];
+    for (int i = 0; i < number; i++) {
+      arr[i] = pq.peek().left;
+      pq.poll();
+    }
+    Arrays.sort(arr);
+    for (int i = 0; i < number; i++) {
+      collector.putLong(rowWindow.getRow(arr[i]).getTime(), rowWindow.getRow(arr[i]).getLong(0));
+    }
+  }
+
+  public void putPQValueFloat(
+      PriorityQueue<Pair<Integer, Double>> pq, RowWindow rowWindow, PointCollector collector)
+      throws IOException {
+    int[] arr = new int[number];
+    for (int i = 0; i < number; i++) {
+      arr[i] = pq.peek().left;
+      pq.poll();
+    }
+    Arrays.sort(arr);
+    for (int i = 0; i < number; i++) {
+      collector.putFloat(rowWindow.getRow(arr[i]).getTime(), rowWindow.getRow(arr[i]).getFloat(0));
+    }
+  }
+
+  public void putPQValueDouble(
+      PriorityQueue<Pair<Integer, Double>> pq, RowWindow rowWindow, PointCollector collector)
+      throws IOException {
+    int[] arr = new int[number];
+    for (int i = 0; i < number; i++) {
+      arr[i] = pq.peek().left;
+      pq.poll();
+    }
+    Arrays.sort(arr);
+    for (int i = 0; i < number; i++) {
+      collector.putDouble(
+          rowWindow.getRow(arr[i]).getTime(), rowWindow.getRow(arr[i]).getDouble(0));
+    }
+  }
+
+  public boolean isWindowSizeTooSmallInt(
+      RowWindow rowWindow, PointCollector collector, int windowSize) throws IOException {
+    if (windowSize <= number) {
+      for (int i = 0; i < windowSize; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putInt(row.getTime(), row.getInt(0));
+      }
+      return true;
+    } else if (windowSize == number + 1) {
+      for (int i = 0; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putInt(row.getTime(), row.getInt(0));
+      }
+      return true;
+    } else if (windowSize == number + 2) {
+      for (int i = 1; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putInt(row.getTime(), row.getInt(0));
+      }
+      return true;
+    }
+    return false;
+  }
+
+  public boolean isWindowSizeTooSmallLong(
+      RowWindow rowWindow, PointCollector collector, int windowSize) throws IOException {
+    if (windowSize <= number) {
+      for (int i = 0; i < windowSize; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putLong(row.getTime(), row.getLong(0));
+      }
+      return true;
+    } else if (windowSize == number + 1) {
+      for (int i = 0; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putLong(row.getTime(), row.getLong(0));
+      }
+      return true;
+    } else if (windowSize == number + 2) {
+      for (int i = 1; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putLong(row.getTime(), row.getLong(0));
+      }
+      return true;
+    }
+    return false;
+  }
+
+  public boolean isWindowSizeTooSmallFloat(
+      RowWindow rowWindow, PointCollector collector, int windowSize) throws IOException {
+    if (windowSize <= number) {
+      for (int i = 0; i < windowSize; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putFloat(row.getTime(), row.getFloat(0));
+      }
+      return true;
+    } else if (windowSize == number + 1) {
+      for (int i = 0; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putFloat(row.getTime(), row.getFloat(0));
+      }
+      return true;
+    } else if (windowSize == number + 2) {
+      for (int i = 1; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putFloat(row.getTime(), row.getFloat(0));
+      }
+      return true;
+    }
+    return false;
+  }
+
+  public boolean isWindowSizeTooSmallDouble(
+      RowWindow rowWindow, PointCollector collector, int windowSize) throws IOException {
+    if (windowSize <= number) {
+      for (int i = 0; i < windowSize; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putDouble(row.getTime(), row.getDouble(0));
+      }
+      return true;
+    } else if (windowSize == number + 1) {
+      for (int i = 0; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putDouble(row.getTime(), row.getDouble(0));
+      }
+      return true;
+    } else if (windowSize == number + 2) {
+      for (int i = 1; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putDouble(row.getTime(), row.getDouble(0));
+      }
+      return true;
+    }
+    return false;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketRandomSample.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketRandomSample.java
new file mode 100644
index 0000000000..d02cb8ff38
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketRandomSample.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.builtin;
+
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.exception.UDFInputSeriesDataTypeNotValidException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+import java.util.Random;
+
+public class UDTFEqualSizeBucketRandomSample extends UDTFEqualSizeBucketSample {
+
+  private Random random;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) {
+    random = new Random();
+    configurations
+        .setAccessStrategy(new SlidingSizeWindowAccessStrategy(bucketSize))
+        .setOutputDataType(dataType);
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector)
+      throws IOException, UDFInputSeriesDataTypeNotValidException {
+    Row row = rowWindow.getRow(random.nextInt(rowWindow.windowSize()));
+    switch (dataType) {
+      case INT32:
+        collector.putInt(row.getTime(), row.getInt(0));
+        break;
+      case INT64:
+        collector.putLong(row.getTime(), row.getLong(0));
+        break;
+      case FLOAT:
+        collector.putFloat(row.getTime(), row.getFloat(0));
+        break;
+      case DOUBLE:
+        collector.putDouble(row.getTime(), row.getDouble(0));
+        break;
+      default:
+        // This will not happen
+        throw new UDFInputSeriesDataTypeNotValidException(
+            0, dataType, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketSample.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketSample.java
new file mode 100644
index 0000000000..3d554140cb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketSample.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.builtin;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.exception.UDFException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public abstract class UDTFEqualSizeBucketSample implements UDTF {
+
+  protected TSDataType dataType;
+  protected double proportion;
+  protected int bucketSize;
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws MetadataException, UDFException {
+    proportion = validator.getParameters().getDoubleOrDefault("proportion", 0.1);
+    validator
+        .validateInputSeriesNumber(1)
+        .validateInputSeriesDataType(
+            0, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE)
+        .validate(
+            proportion -> (double) proportion > 0 && (double) proportion <= 1,
+            "Illegal sample proportion. proportion > 0 and proportion <= 1",
+            proportion);
+    dataType = validator.getParameters().getDataType(0);
+    bucketSize = (int) (1 / proportion);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4.java
new file mode 100644
index 0000000000..0ac1258363
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.builtin;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.exception.UDFException;
+import org.apache.iotdb.db.query.udf.api.exception.UDFInputSeriesDataTypeNotValidException;
+import org.apache.iotdb.db.query.udf.api.exception.UDFParameterNotValidException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+
+/**
+ * For each sliding window, M4 returns the first, last, bottom, top points. The window can be
+ * controlled by either point size or time interval length. The aggregated points in the output
+ * series have been sorted and deduplicated.
+ *
+ * <p>SlidingSizeWindow usage Example: "select M4(s1,'windowSize'='10','slidingStep'='10') from
+ * root.vehicle.d1" (windowSize is required, slidingStep is optional.)
+ *
+ * <p>SlidingTimeWindow usage Example: "select
+ * M4(s1,'timeInterval'='25','slidingStep'='25','displayWindowBegin'='0','displayWindowEnd'='100')
+ * from root.vehicle.d1" (timeInterval is required, slidingStep/displayWindowBegin/displayWindowEnd
+ * are optional.)
+ */
+public class UDTFM4 implements UDTF {
+
+  enum AccessStrategy {
+    SIZE_WINDOW,
+    TIME_WINDOW
+  }
+
+  protected AccessStrategy accessStrategy;
+  protected TSDataType dataType;
+
+  public static final String WINDOW_SIZE_KEY = "windowSize";
+  public static final String TIME_INTERVAL_KEY = "timeInterval";
+  public static final String SLIDING_STEP_KEY = "slidingStep";
+  public static final String DISPLAY_WINDOW_BEGIN_KEY = "displayWindowBegin";
+  public static final String DISPLAY_WINDOW_END_KEY = "displayWindowEnd";
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws UDFException, MetadataException {
+    validator
+        .validateInputSeriesNumber(1)
+        .validateInputSeriesDataType(
+            0, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE);
+
+    if (!validator.getParameters().hasAttribute(WINDOW_SIZE_KEY)
+        && !validator.getParameters().hasAttribute(TIME_INTERVAL_KEY)) {
+      throw new UDFParameterNotValidException(
+          String.format(
+              "attribute \"%s\"/\"%s\" is required but was not provided.",
+              WINDOW_SIZE_KEY, TIME_INTERVAL_KEY));
+    }
+    if (validator.getParameters().hasAttribute(WINDOW_SIZE_KEY)
+        && validator.getParameters().hasAttribute(TIME_INTERVAL_KEY)) {
+      throw new UDFParameterNotValidException(
+          String.format(
+              "use attribute \"%s\" or \"%s\" only one at a time.",
+              WINDOW_SIZE_KEY, TIME_INTERVAL_KEY));
+    }
+    if (validator.getParameters().hasAttribute(WINDOW_SIZE_KEY)) {
+      accessStrategy = AccessStrategy.SIZE_WINDOW;
+    } else {
+      accessStrategy = AccessStrategy.TIME_WINDOW;
+    }
+
+    dataType = validator.getParameters().getDataType(0);
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws MetadataException {
+    // set data type
+    configurations.setOutputDataType(dataType);
+
+    // set access strategy
+    if (accessStrategy == AccessStrategy.SIZE_WINDOW) {
+      int windowSize = parameters.getInt(WINDOW_SIZE_KEY);
+      int slidingStep = parameters.getIntOrDefault(SLIDING_STEP_KEY, windowSize);
+      configurations.setAccessStrategy(
+          new SlidingSizeWindowAccessStrategy(windowSize, slidingStep));
+    } else {
+      long timeInterval = parameters.getLong(TIME_INTERVAL_KEY);
+      long displayWindowBegin =
+          parameters.getLongOrDefault(DISPLAY_WINDOW_BEGIN_KEY, Long.MIN_VALUE);
+      long displayWindowEnd = parameters.getLongOrDefault(DISPLAY_WINDOW_END_KEY, Long.MAX_VALUE);
+      long slidingStep = parameters.getLongOrDefault(SLIDING_STEP_KEY, timeInterval);
+      configurations.setAccessStrategy(
+          new SlidingTimeWindowAccessStrategy(
+              timeInterval, slidingStep, displayWindowBegin, displayWindowEnd));
+    }
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector)
+      throws UDFException, IOException {
+    switch (dataType) {
+      case INT32:
+        transformInt(rowWindow, collector);
+        break;
+      case INT64:
+        transformLong(rowWindow, collector);
+        break;
+      case FLOAT:
+        transformFloat(rowWindow, collector);
+        break;
+      case DOUBLE:
+        transformDouble(rowWindow, collector);
+        break;
+      default:
+        // This will not happen
+        throw new UDFInputSeriesDataTypeNotValidException(
+            0, dataType, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE);
+    }
+  }
+
+  public void transformInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+    if (rowWindow.windowSize() > 0) { // else empty window do nothing
+      int firstValue = rowWindow.getRow(0).getInt(0);
+      int lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getInt(0);
+
+      int minValue = Math.min(firstValue, lastValue);
+      int maxValue = Math.max(firstValue, lastValue);
+      int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
+      int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
+
+      for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
+        int value = rowWindow.getRow(i).getInt(0);
+        if (value < minValue) {
+          minValue = value;
+          minIndex = i;
+        }
+        if (value > maxValue) {
+          maxValue = value;
+          maxIndex = i;
+        }
+      }
+
+      Row row = rowWindow.getRow(0);
+      collector.putInt(row.getTime(), row.getInt(0));
+
+      int smallerIndex = Math.min(minIndex, maxIndex);
+      int largerIndex = Math.max(minIndex, maxIndex);
+      if (smallerIndex > 0) {
+        row = rowWindow.getRow(smallerIndex);
+        collector.putInt(row.getTime(), row.getInt(0));
+      }
+      if (largerIndex > smallerIndex) {
+        row = rowWindow.getRow(largerIndex);
+        collector.putInt(row.getTime(), row.getInt(0));
+      }
+      if (largerIndex < rowWindow.windowSize() - 1) {
+        row = rowWindow.getRow(rowWindow.windowSize() - 1);
+        collector.putInt(row.getTime(), row.getInt(0));
+      }
+    }
+  }
+
+  public void transformLong(RowWindow rowWindow, PointCollector collector) throws IOException {
+    if (rowWindow.windowSize() > 0) { // else empty window do nothing
+      long firstValue = rowWindow.getRow(0).getLong(0);
+      long lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getLong(0);
+
+      long minValue = Math.min(firstValue, lastValue);
+      long maxValue = Math.max(firstValue, lastValue);
+      int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
+      int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
+
+      for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
+        long value = rowWindow.getRow(i).getLong(0);
+        if (value < minValue) {
+          minValue = value;
+          minIndex = i;
+        }
+        if (value > maxValue) {
+          maxValue = value;
+          maxIndex = i;
+        }
+      }
+
+      Row row = rowWindow.getRow(0);
+      collector.putLong(row.getTime(), row.getLong(0));
+
+      int smallerIndex = Math.min(minIndex, maxIndex);
+      int largerIndex = Math.max(minIndex, maxIndex);
+      if (smallerIndex > 0) {
+        row = rowWindow.getRow(smallerIndex);
+        collector.putLong(row.getTime(), row.getLong(0));
+      }
+      if (largerIndex > smallerIndex) {
+        row = rowWindow.getRow(largerIndex);
+        collector.putLong(row.getTime(), row.getLong(0));
+      }
+      if (largerIndex < rowWindow.windowSize() - 1) {
+        row = rowWindow.getRow(rowWindow.windowSize() - 1);
+        collector.putLong(row.getTime(), row.getLong(0));
+      }
+    }
+  }
+
+  public void transformFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
+    if (rowWindow.windowSize() > 0) { // else empty window do nothing
+      float firstValue = rowWindow.getRow(0).getFloat(0);
+      float lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getFloat(0);
+
+      float minValue = Math.min(firstValue, lastValue);
+      float maxValue = Math.max(firstValue, lastValue);
+      int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
+      int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
+
+      for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
+        float value = rowWindow.getRow(i).getFloat(0);
+        if (value < minValue) {
+          minValue = value;
+          minIndex = i;
+        }
+        if (value > maxValue) {
+          maxValue = value;
+          maxIndex = i;
+        }
+      }
+
+      Row row = rowWindow.getRow(0);
+      collector.putFloat(row.getTime(), row.getFloat(0));
+
+      int smallerIndex = Math.min(minIndex, maxIndex);
+      int largerIndex = Math.max(minIndex, maxIndex);
+      if (smallerIndex > 0) {
+        row = rowWindow.getRow(smallerIndex);
+        collector.putFloat(row.getTime(), row.getFloat(0));
+      }
+      if (largerIndex > smallerIndex) {
+        row = rowWindow.getRow(largerIndex);
+        collector.putFloat(row.getTime(), row.getFloat(0));
+      }
+      if (largerIndex < rowWindow.windowSize() - 1) {
+        row = rowWindow.getRow(rowWindow.windowSize() - 1);
+        collector.putFloat(row.getTime(), row.getFloat(0));
+      }
+    }
+  }
+
+  public void transformDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
+    if (rowWindow.windowSize() > 0) { // else empty window do nothing
+      double firstValue = rowWindow.getRow(0).getDouble(0);
+      double lastValue = rowWindow.getRow(rowWindow.windowSize() - 1).getDouble(0);
+
+      double minValue = Math.min(firstValue, lastValue);
+      double maxValue = Math.max(firstValue, lastValue);
+      int minIndex = (firstValue < lastValue) ? 0 : rowWindow.windowSize() - 1;
+      int maxIndex = (firstValue > lastValue) ? 0 : rowWindow.windowSize() - 1;
+
+      for (int i = 1; i < rowWindow.windowSize() - 1; i++) {
+        double value = rowWindow.getRow(i).getDouble(0);
+        if (value < minValue) {
+          minValue = value;
+          minIndex = i;
+        }
+        if (value > maxValue) {
+          maxValue = value;
+          maxIndex = i;
+        }
+      }
+
+      Row row = rowWindow.getRow(0);
+      collector.putDouble(row.getTime(), row.getDouble(0));
+
+      int smallerIndex = Math.min(minIndex, maxIndex);
+      int largerIndex = Math.max(minIndex, maxIndex);
+      if (smallerIndex > 0) {
+        row = rowWindow.getRow(smallerIndex);
+        collector.putDouble(row.getTime(), row.getDouble(0));
+      }
+      if (largerIndex > smallerIndex) {
+        row = rowWindow.getRow(largerIndex);
+        collector.putDouble(row.getTime(), row.getDouble(0));
+      }
+      if (largerIndex < rowWindow.windowSize() - 1) {
+        row = rowWindow.getRow(rowWindow.windowSize() - 1);
+        collector.putDouble(row.getTime(), row.getDouble(0));
+      }
+    }
+  }
+}
diff --git a/site/src/main/.vuepress/config.js b/site/src/main/.vuepress/config.js
index 2a7adbed52..9db9d63d31 100644
--- a/site/src/main/.vuepress/config.js
+++ b/site/src/main/.vuepress/config.js
@@ -809,8 +809,7 @@ var config = {
 					        ['UDF-Library/Data-Quality', 'Data Quality'],
 					        ['UDF-Library/Data-Repairing', 'Data Repairing'],
 					        ['UDF-Library/Series-Discovery', 'Series Discovery'],
-                            ['UDF-Library/String-Processing', 'String Processing'],
-                            ['UDF-Library/M4', 'M4']
+                  ['UDF-Library/String-Processing', 'String Processing']
 					    ]
 					},
 					{
@@ -986,8 +985,7 @@ var config = {
 					        ['UDF-Library/Data-Quality', 'Data Quality'],
 					        ['UDF-Library/Data-Repairing', 'Data Repairing'],
 					        ['UDF-Library/Series-Discovery', 'Series Discovery'],
-                            ['UDF-Library/String-Processing', 'String Processing'],
-                            ['UDF-Library/M4', 'M4']
+                  ['UDF-Library/String-Processing', 'String Processing']
 					    ]
 					},
 					{
@@ -1731,8 +1729,7 @@ var config = {
                             ['UDF-Library/Data-Quality', '数据质量'],
                             ['UDF-Library/Data-Repairing', '数据修复'],
                             ['UDF-Library/Series-Discovery', '序列发现'],
-                            ['UDF-Library/String-Processing', '字符串处理'],
-                            ['UDF-Library/M4', 'M4']
+                            ['UDF-Library/String-Processing', '字符串处理']
                         ]
                     },
 					{
@@ -1909,8 +1906,7 @@ var config = {
                             ['UDF-Library/Data-Quality', '数据质量'],
                             ['UDF-Library/Data-Repairing', '数据修复'],
                             ['UDF-Library/Series-Discovery', '序列发现'],
-                            ['UDF-Library/String-Processing', '字符串处理'],
-                            ['UDF-Library/M4', 'M4']
+                            ['UDF-Library/String-Processing', '字符串处理']
                         ]
                     },
 					{