You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/04/22 07:15:45 UTC

[iotdb] branch master updated: [IOTDB-2740] Equal size bucket sampling UDFs: EQUAL_SIZE_BUCKET_RANDOM_SAMPLE, EQUAL_SIZE_BUCKET_AGG_SAMPLE, EQUAL_SIZE_BUCKET_M4_SAMPLE (#5518)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new dc344e5b70 [IOTDB-2740] Equal size bucket sampling UDFs: EQUAL_SIZE_BUCKET_RANDOM_SAMPLE, EQUAL_SIZE_BUCKET_AGG_SAMPLE, EQUAL_SIZE_BUCKET_M4_SAMPLE (#5518)
dc344e5b70 is described below

commit dc344e5b7016d7cf2f3b8a3d0b6d5e3a83bd142a
Author: Hang Zhang <34...@users.noreply.github.com>
AuthorDate: Fri Apr 22 15:15:38 2022 +0800

    [IOTDB-2740] Equal size bucket sampling UDFs: EQUAL_SIZE_BUCKET_RANDOM_SAMPLE, EQUAL_SIZE_BUCKET_AGG_SAMPLE, EQUAL_SIZE_BUCKET_M4_SAMPLE (#5518)
---
 docs/UserGuide/Query-Data/Select-Expression.md     | 143 ++++++
 docs/zh/UserGuide/Query-Data/Select-Expression.md  | 143 ++++++
 .../db/integration/IoTDBUDTFBuiltinFunctionIT.java | 190 ++++++++
 .../db/query/udf/builtin/BuiltinFunction.java      |   5 +-
 .../udf/builtin/UDTFEqualSizeBucketAggSample.java  | 487 +++++++++++++++++++++
 .../udf/builtin/UDTFEqualSizeBucketM4Sample.java   | 238 ++++++++++
 .../builtin/UDTFEqualSizeBucketRandomSample.java   |  68 +++
 .../udf/builtin/UDTFEqualSizeBucketSample.java     |  48 ++
 8 files changed, 1321 insertions(+), 1 deletion(-)

diff --git a/docs/UserGuide/Query-Data/Select-Expression.md b/docs/UserGuide/Query-Data/Select-Expression.md
index 94a34917d0..6ab41d2b46 100644
--- a/docs/UserGuide/Query-Data/Select-Expression.md
+++ b/docs/UserGuide/Query-Data/Select-Expression.md
@@ -526,6 +526,149 @@ Result:
 +-----------------------------+-------------+-------------------------+-----------------------------+----------------------------+--------------------------------+
 ```
 
+### Equal Size Bucket Sample Function
+This function samples the input sequence in equal size buckets, that is, according to the downsampling ratio and downsampling method given by the user, the input sequence is equally divided into several buckets according to a fixed number of points. Sampling by the given sampling method within each bucket.
+- `proportion`: sample ratio, the value range is `(0, 1]`.
+#### Equal Size Bucket Random Sample
+Random sampling is performed on the equally divided buckets.
+
+| Function Name | Allowed Input Series Data Types | Required Attributes                           | Output Series Data Type | Series Data Type  Description                 |
+|----------|--------------------------------|---------------------------------------|------------|--------------------------------------------------|
+| EQUAL_SIZE_BUCKET_RANDOM_SAMPLE   | INT32 / INT64 / FLOAT / DOUBLE | `proportion` The value range is `(0, 1]`, the default is `0.1` | INT32 / INT64 / FLOAT / DOUBLE | Returns a random sample of equal buckets that matches the sampling ratio |
+
+##### Demonstrate
+Example data: `root.ln.wf01.wt01.temperature` has a total of `100` ordered data from `0.0-99.0`.
+```
+IoTDB> select temperature from root.ln.wf01.wt01;
++-----------------------------+-----------------------------+
+|                         Time|root.ln.wf01.wt01.temperature|
++-----------------------------+-----------------------------+
+|1970-01-01T08:00:00.000+08:00|                          0.0|
+|1970-01-01T08:00:00.001+08:00|                          1.0|
+|1970-01-01T08:00:00.002+08:00|                          2.0|
+|1970-01-01T08:00:00.003+08:00|                          3.0|
+|1970-01-01T08:00:00.004+08:00|                          4.0|
+|1970-01-01T08:00:00.005+08:00|                          5.0|
+|1970-01-01T08:00:00.006+08:00|                          6.0|
+|1970-01-01T08:00:00.007+08:00|                          7.0|
+|1970-01-01T08:00:00.008+08:00|                          8.0|
+|1970-01-01T08:00:00.009+08:00|                          9.0|
+|1970-01-01T08:00:00.010+08:00|                         10.0|
+|1970-01-01T08:00:00.011+08:00|                         11.0|
+|1970-01-01T08:00:00.012+08:00|                         12.0|
+|.............................|.............................|            
+|1970-01-01T08:00:00.089+08:00|                         89.0|
+|1970-01-01T08:00:00.090+08:00|                         90.0|
+|1970-01-01T08:00:00.091+08:00|                         91.0|
+|1970-01-01T08:00:00.092+08:00|                         92.0|
+|1970-01-01T08:00:00.093+08:00|                         93.0|
+|1970-01-01T08:00:00.094+08:00|                         94.0|
+|1970-01-01T08:00:00.095+08:00|                         95.0|
+|1970-01-01T08:00:00.096+08:00|                         96.0|
+|1970-01-01T08:00:00.097+08:00|                         97.0|
+|1970-01-01T08:00:00.098+08:00|                         98.0|
+|1970-01-01T08:00:00.099+08:00|                         99.0|
++-----------------------------+-----------------------------+
+```
+Sql:
+```sql
+select equal_size_bucket_random_sample(temperature,'proportion'='0.1') as random_sample from root.ln.wf01.wt01;
+```
+Result:
+```
++-----------------------------+-------------+
+|                         Time|random_sample|
++-----------------------------+-------------+
+|1970-01-01T08:00:00.007+08:00|          7.0|
+|1970-01-01T08:00:00.014+08:00|         14.0|
+|1970-01-01T08:00:00.020+08:00|         20.0|
+|1970-01-01T08:00:00.035+08:00|         35.0|
+|1970-01-01T08:00:00.047+08:00|         47.0|
+|1970-01-01T08:00:00.059+08:00|         59.0|
+|1970-01-01T08:00:00.063+08:00|         63.0|
+|1970-01-01T08:00:00.079+08:00|         79.0|
+|1970-01-01T08:00:00.086+08:00|         86.0|
+|1970-01-01T08:00:00.096+08:00|         96.0|
++-----------------------------+-------------+
+Total line number = 10
+It costs 0.024s
+```
+
+#### Equal Size Bucket Aggregation Sample
+
+The input sequence is sampled by the aggregation sampling method, and the user needs to provide an additional aggregation function parameter, namely
+- `type`: Aggregate type, which can be `avg` or `max` or `min` or `sum` or `extreme` or `variance`. By default, `avg` is used. `extreme` represents the value with the largest absolute value in the equal bucket. `variance` represents the variance in the sampling equal buckets.
+
+The timestamp of the sampling output of each bucket is the timestamp of the first point of the bucket.
+
+| Function Name | Allowed Input Series Data Types | Required Attributes                           | Output Series Data Type | Series Data Type  Description                 |
+|----------|--------------------------------|---------------------------------------|------------|--------------------------------------------------|
+| EQUAL_SIZE_BUCKET_AGG_SAMPLE | INT32 / INT64 / FLOAT / DOUBLE | `proportion` The value range is `(0, 1]`, the default is `0.1`</br>`type`: The value types are `avg`, `max`, `min`, `sum`, `extreme`, `variance`, the default is `avg` | INT32 / INT64 / FLOAT / DOUBLE | Returns equal bucket aggregation samples that match the sampling ratio |
+
+##### Demonstrate
+Example data: `root.ln.wf01.wt01.temperature` has a total of `100` ordered data from `0.0-99.0`, and the test data is randomly sampled in equal buckets.
+
+Sql:
+```sql
+select equal_size_bucket_agg_sample(temperature, 'type'='avg','proportion'='0.1') as agg_avg, equal_size_bucket_agg_sample(temperature, 'type'='max','proportion'='0.1') as agg_max, equal_size_bucket_agg_sample(temperature,'type'='min','proportion'='0.1') as agg_min, equal_size_bucket_agg_sample(temperature, 'type'='sum','proportion'='0.1') as agg_sum, equal_size_bucket_agg_sample(temperature, 'type'='extreme','proportion'='0.1') as agg_extreme, equal_size_bucket_agg_sample(temperature, ' [...]
+```
+Result:
+```
++-----------------------------+-----------------+-------+-------+-------+-----------+------------+
+|                         Time|          agg_avg|agg_max|agg_min|agg_sum|agg_extreme|agg_variance|
++-----------------------------+-----------------+-------+-------+-------+-----------+------------+
+|1970-01-01T08:00:00.000+08:00|              4.5|    9.0|    0.0|   45.0|        9.0|        8.25|
+|1970-01-01T08:00:00.010+08:00|             14.5|   19.0|   10.0|  145.0|       19.0|        8.25|
+|1970-01-01T08:00:00.020+08:00|             24.5|   29.0|   20.0|  245.0|       29.0|        8.25|
+|1970-01-01T08:00:00.030+08:00|             34.5|   39.0|   30.0|  345.0|       39.0|        8.25|
+|1970-01-01T08:00:00.040+08:00|             44.5|   49.0|   40.0|  445.0|       49.0|        8.25|
+|1970-01-01T08:00:00.050+08:00|             54.5|   59.0|   50.0|  545.0|       59.0|        8.25|
+|1970-01-01T08:00:00.060+08:00|             64.5|   69.0|   60.0|  645.0|       69.0|        8.25|
+|1970-01-01T08:00:00.070+08:00|74.50000000000001|   79.0|   70.0|  745.0|       79.0|        8.25|
+|1970-01-01T08:00:00.080+08:00|             84.5|   89.0|   80.0|  845.0|       89.0|        8.25|
+|1970-01-01T08:00:00.090+08:00|             94.5|   99.0|   90.0|  945.0|       99.0|        8.25|
++-----------------------------+-----------------+-------+-------+-------+-----------+------------+
+Total line number = 10
+It costs 0.044s
+```
+#### Equal Size Bucket M4 Sample
+
+The input sequence is sampled using the M4 sampling method. That is to sample the head, tail, min and max values for each bucket.
+
+| Function Name | Allowed Input Series Data Types | Required Attributes                           | Output Series Data Type | Series Data Type  Description                 |
+|----------|--------------------------------|---------------------------------------|------------|--------------------------------------------------|
+| EQUAL_SIZE_BUCKET_M4_SAMPLE | INT32 / INT64 / FLOAT / DOUBLE | `proportion` The value range is `(0, 1]`, the default is `0.1` | INT32 / INT64 / FLOAT / DOUBLE | Returns equal bucket M4 samples that match the sampling ratio |
+
+
+##### Demonstrate
+Example data: `root.ln.wf01.wt01.temperature` has a total of `100` ordered data from `0.0-99.0`, and the test data is randomly sampled in equal buckets.
+
+Sql:
+```sql
+select equal_size_bucket_m4_sample(temperature, 'proportion'='0.1') as M4_sample from root.ln.wf01.wt01;
+```
+Result:
+```
++-----------------------------+---------+
+|                         Time|M4_sample|
++-----------------------------+---------+
+|1970-01-01T08:00:00.000+08:00|      0.0|
+|1970-01-01T08:00:00.001+08:00|      1.0|
+|1970-01-01T08:00:00.038+08:00|     38.0|
+|1970-01-01T08:00:00.039+08:00|     39.0|
+|1970-01-01T08:00:00.040+08:00|     40.0|
+|1970-01-01T08:00:00.041+08:00|     41.0|
+|1970-01-01T08:00:00.078+08:00|     78.0|
+|1970-01-01T08:00:00.079+08:00|     79.0|
+|1970-01-01T08:00:00.080+08:00|     80.0|
+|1970-01-01T08:00:00.081+08:00|     81.0|
+|1970-01-01T08:00:00.098+08:00|     98.0|
+|1970-01-01T08:00:00.099+08:00|     99.0|
++-----------------------------+---------+
+Total line number = 12
+It costs 0.065s
+```
+
 ### User Defined Timeseries Generating Functions
 
 Please refer to [UDF (User Defined Function)](../Process-Data/UDF-User-Defined-Function.md).
diff --git a/docs/zh/UserGuide/Query-Data/Select-Expression.md b/docs/zh/UserGuide/Query-Data/Select-Expression.md
index 9236764879..0e1ca6f8ab 100644
--- a/docs/zh/UserGuide/Query-Data/Select-Expression.md
+++ b/docs/zh/UserGuide/Query-Data/Select-Expression.md
@@ -529,6 +529,149 @@ select s1, zero_count(s1), non_zero_count(s2), zero_duration(s3), non_zero_durat
 +-----------------------------+-------------+-------------------------+-----------------------------+----------------------------+--------------------------------+
 ```
 
+### 等数量分桶降采样函数
+本函数对输入序列进行等数量分桶采样,即根据用户给定的降采样比例和降采样方法将输入序列按固定点数等分为若干桶。在每个桶内通过给定的采样方法进行采样。
+- `proportion`:降采样比例,取值区间为`(0, 1]`。
+#### 等数量分桶随机采样
+对等数量分桶后,桶内进行随机采样。
+
+| 函数名      | 可接收的输入序列类型                     | 必要的属性参数                               | 输出序列类型     | 功能类型                                             |
+|----------|--------------------------------|---------------------------------------|------------|--------------------------------------------------|
+| EQUAL_SIZE_BUCKET_RANDOM_SAMPLE   | INT32 / INT64 / FLOAT / DOUBLE | `proportion`取值范围为`(0, 1]`,默认为`0.1`  | INT32 / INT64 / FLOAT / DOUBLE | 返回符合采样比例的等分桶随机采样                |
+
+##### 演示
+测试数据:`root.ln.wf01.wt01.temperature`从`0.0-99.0`共`100`条有序数据。
+```
+IoTDB> select temperature from root.ln.wf01.wt01;
++-----------------------------+-----------------------------+
+|                         Time|root.ln.wf01.wt01.temperature|
++-----------------------------+-----------------------------+
+|1970-01-01T08:00:00.000+08:00|                          0.0|
+|1970-01-01T08:00:00.001+08:00|                          1.0|
+|1970-01-01T08:00:00.002+08:00|                          2.0|
+|1970-01-01T08:00:00.003+08:00|                          3.0|
+|1970-01-01T08:00:00.004+08:00|                          4.0|
+|1970-01-01T08:00:00.005+08:00|                          5.0|
+|1970-01-01T08:00:00.006+08:00|                          6.0|
+|1970-01-01T08:00:00.007+08:00|                          7.0|
+|1970-01-01T08:00:00.008+08:00|                          8.0|
+|1970-01-01T08:00:00.009+08:00|                          9.0|
+|1970-01-01T08:00:00.010+08:00|                         10.0|
+|1970-01-01T08:00:00.011+08:00|                         11.0|
+|1970-01-01T08:00:00.012+08:00|                         12.0|
+|.............................|.............................|            
+|1970-01-01T08:00:00.089+08:00|                         89.0|
+|1970-01-01T08:00:00.090+08:00|                         90.0|
+|1970-01-01T08:00:00.091+08:00|                         91.0|
+|1970-01-01T08:00:00.092+08:00|                         92.0|
+|1970-01-01T08:00:00.093+08:00|                         93.0|
+|1970-01-01T08:00:00.094+08:00|                         94.0|
+|1970-01-01T08:00:00.095+08:00|                         95.0|
+|1970-01-01T08:00:00.096+08:00|                         96.0|
+|1970-01-01T08:00:00.097+08:00|                         97.0|
+|1970-01-01T08:00:00.098+08:00|                         98.0|
+|1970-01-01T08:00:00.099+08:00|                         99.0|
++-----------------------------+-----------------------------+
+```
+sql:
+```sql
+select equal_size_bucket_random_sample(temperature,'proportion'='0.1') as random_sample from root.ln.wf01.wt01;
+```
+结果:
+```
++-----------------------------+-------------+
+|                         Time|random_sample|
++-----------------------------+-------------+
+|1970-01-01T08:00:00.007+08:00|          7.0|
+|1970-01-01T08:00:00.014+08:00|         14.0|
+|1970-01-01T08:00:00.020+08:00|         20.0|
+|1970-01-01T08:00:00.035+08:00|         35.0|
+|1970-01-01T08:00:00.047+08:00|         47.0|
+|1970-01-01T08:00:00.059+08:00|         59.0|
+|1970-01-01T08:00:00.063+08:00|         63.0|
+|1970-01-01T08:00:00.079+08:00|         79.0|
+|1970-01-01T08:00:00.086+08:00|         86.0|
+|1970-01-01T08:00:00.096+08:00|         96.0|
++-----------------------------+-------------+
+Total line number = 10
+It costs 0.024s
+```
+
+#### 等数量分桶聚合采样
+
+采用聚合采样法对输入序列进行采样,用户需要另外提供一个聚合函数参数即
+- `type`:聚合类型,取值为`avg`或`max`或`min`或`sum`或`extreme`或`variance`。在缺省情况下,采用`avg`。其中`extreme`表示等分桶中,绝对值最大的值。`variance`表示采样等分桶中的方差。
+
+每个桶采样输出的时间戳为这个桶第一个点的时间戳
+
+
+| 函数名      | 可接收的输入序列类型                     | 必要的属性参数                               | 输出序列类型     | 功能类型                                             |
+|----------|--------------------------------|---------------------------------------|------------|--------------------------------------------------|
+| EQUAL_SIZE_BUCKET_AGG_SAMPLE   | INT32 / INT64 / FLOAT / DOUBLE | `proportion`取值范围为`(0, 1]`,默认为`0.1`</br>`type`:取值类型有`avg`, `max`, `min`, `sum`, `extreme`, `variance`, 默认为`avg`  | INT32 / INT64 / FLOAT / DOUBLE | 返回符合采样比例的等分桶聚合采样                |
+
+##### 演示
+测试数据:`root.ln.wf01.wt01.temperature`从`0.0-99.0`共`100`条有序数据,同等分桶随机采样的测试数据。
+
+sql:
+```sql
+select equal_size_bucket_agg_sample(temperature, 'type'='avg','proportion'='0.1') as agg_avg, equal_size_bucket_agg_sample(temperature, 'type'='max','proportion'='0.1') as agg_max, equal_size_bucket_agg_sample(temperature,'type'='min','proportion'='0.1') as agg_min, equal_size_bucket_agg_sample(temperature, 'type'='sum','proportion'='0.1') as agg_sum, equal_size_bucket_agg_sample(temperature, 'type'='extreme','proportion'='0.1') as agg_extreme, equal_size_bucket_agg_sample(temperature, ' [...]
+```
+结果:
+```
++-----------------------------+-----------------+-------+-------+-------+-----------+------------+
+|                         Time|          agg_avg|agg_max|agg_min|agg_sum|agg_extreme|agg_variance|
++-----------------------------+-----------------+-------+-------+-------+-----------+------------+
+|1970-01-01T08:00:00.000+08:00|              4.5|    9.0|    0.0|   45.0|        9.0|        8.25|
+|1970-01-01T08:00:00.010+08:00|             14.5|   19.0|   10.0|  145.0|       19.0|        8.25|
+|1970-01-01T08:00:00.020+08:00|             24.5|   29.0|   20.0|  245.0|       29.0|        8.25|
+|1970-01-01T08:00:00.030+08:00|             34.5|   39.0|   30.0|  345.0|       39.0|        8.25|
+|1970-01-01T08:00:00.040+08:00|             44.5|   49.0|   40.0|  445.0|       49.0|        8.25|
+|1970-01-01T08:00:00.050+08:00|             54.5|   59.0|   50.0|  545.0|       59.0|        8.25|
+|1970-01-01T08:00:00.060+08:00|             64.5|   69.0|   60.0|  645.0|       69.0|        8.25|
+|1970-01-01T08:00:00.070+08:00|74.50000000000001|   79.0|   70.0|  745.0|       79.0|        8.25|
+|1970-01-01T08:00:00.080+08:00|             84.5|   89.0|   80.0|  845.0|       89.0|        8.25|
+|1970-01-01T08:00:00.090+08:00|             94.5|   99.0|   90.0|  945.0|       99.0|        8.25|
++-----------------------------+-----------------+-------+-------+-------+-----------+------------+
+Total line number = 10
+It costs 0.044s
+```
+#### 等数量分桶M4采样
+
+采用M4采样法对输入序列进行采样。即对于每个桶采样首、尾、最小和最大值。
+
+| 函数名      | 可接收的输入序列类型                     | 必要的属性参数                               | 输出序列类型     | 功能类型                                             |
+|----------|--------------------------------|---------------------------------------|------------|--------------------------------------------------|
+| EQUAL_SIZE_BUCKET_M4_SAMPLE   | INT32 / INT64 / FLOAT / DOUBLE | `proportion`取值范围为`(0, 1]`,默认为`0.1`| INT32 / INT64 / FLOAT / DOUBLE | 返回符合采样比例的等分桶M4采样                |
+
+##### 演示
+测试数据:`root.ln.wf01.wt01.temperature`从`0.0-99.0`共`100`条有序数据,同等分桶随机采样的测试数据。
+
+sql:
+```sql
+select equal_size_bucket_m4_sample(temperature, 'proportion'='0.1') as M4_sample from root.ln.wf01.wt01;
+```
+结果:
+```
++-----------------------------+---------+
+|                         Time|M4_sample|
++-----------------------------+---------+
+|1970-01-01T08:00:00.000+08:00|      0.0|
+|1970-01-01T08:00:00.001+08:00|      1.0|
+|1970-01-01T08:00:00.038+08:00|     38.0|
+|1970-01-01T08:00:00.039+08:00|     39.0|
+|1970-01-01T08:00:00.040+08:00|     40.0|
+|1970-01-01T08:00:00.041+08:00|     41.0|
+|1970-01-01T08:00:00.078+08:00|     78.0|
+|1970-01-01T08:00:00.079+08:00|     79.0|
+|1970-01-01T08:00:00.080+08:00|     80.0|
+|1970-01-01T08:00:00.081+08:00|     81.0|
+|1970-01-01T08:00:00.098+08:00|     98.0|
+|1970-01-01T08:00:00.099+08:00|     99.0|
++-----------------------------+---------+
+Total line number = 12
+It costs 0.065s
+```
+
 ### 自定义时间序列生成函数
 
 请参考 [UDF](../Process-Data/UDF-User-Defined-Function.md)。
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFBuiltinFunctionIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFBuiltinFunctionIT.java
index 974ce5b3e4..bf75cdc5f0 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFBuiltinFunctionIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFBuiltinFunctionIT.java
@@ -577,4 +577,194 @@ public class IoTDBUDTFBuiltinFunctionIT {
       assertTrue(e.getMessage().contains("Upper can not be smaller than lower."));
     }
   }
+
+  @Test
+  public void testEqualBucketSampleForRandom() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE TIMESERIES root.sg.d5.s1 with datatype=INT32,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+    String[] SQL_FOR_SAMPLE_S1 = new String[100];
+    for (int i = 0; i < 100; i++) {
+      SQL_FOR_SAMPLE_S1[i] =
+          String.format("insert into root.sg.d5(time, s1) values (%d, %d)", i, i);
+    }
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (int i = 0; i < 100; i++) {
+        statement.execute(SQL_FOR_SAMPLE_S1[i]);
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      String functionName = "EQUAL_SIZE_BUCKET_RANDOM_SAMPLE";
+      double proportionValue = 0.1;
+      ResultSet resultSet =
+          statement.executeQuery(
+              String.format(
+                  "select " + "%s(s1, 'proportion'='%f') from root.sg.d5",
+                  functionName, proportionValue));
+      int columnCount = resultSet.getMetaData().getColumnCount();
+      assertEquals(1 + 1, columnCount);
+      int count = 0;
+      while (resultSet.next()) {
+        count++;
+      }
+      assertEquals(10, count);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testEqualBucketSampleForAgg() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE TIMESERIES root.sg.d4.s1 with datatype=FLOAT,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.sg.d4.s2 with datatype=DOUBLE,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.sg.d4.s3 with datatype=INT64,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.sg.d4.s4 with datatype=INT32,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.sg.d4.s5 with datatype=INT32,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.sg.d4.s6 with datatype=DOUBLE,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+    String[] SQL_FOR_SAMPLE_S1 = new String[100];
+    String[] SQL_FOR_SAMPLE_S2 = new String[100];
+    String[] SQL_FOR_SAMPLE_S3 = new String[100];
+    String[] SQL_FOR_SAMPLE_S4 = new String[100];
+    String[] SQL_FOR_SAMPLE_S5 = new String[100];
+    String[] SQL_FOR_SAMPLE_S6 = new String[100];
+
+    for (int i = 0; i < 100; i++) {
+      SQL_FOR_SAMPLE_S1[i] =
+          String.format("insert into root.sg.d4(time, s1) values (%d, %f)", i, i * 1.0);
+      SQL_FOR_SAMPLE_S2[i] =
+          String.format("insert into root.sg.d4(time, s2) values (%d, %f)", i, i * 1.0);
+      SQL_FOR_SAMPLE_S3[i] =
+          String.format("insert into root.sg.d4(time, s3) values (%d, %d)", i, i);
+      SQL_FOR_SAMPLE_S4[i] =
+          String.format("insert into root.sg.d4(time, s4) values (%d, %d)", i, i);
+      SQL_FOR_SAMPLE_S5[i] =
+          String.format("insert into root.sg.d4(time, s5) values (%d, %d)", i, -i);
+      SQL_FOR_SAMPLE_S6[i] =
+          String.format("insert into root.sg.d4(time, s6) values (%d, %f)", i, i * 1.0);
+    }
+    float[] ANSWER1 =
+        new float[] {4.5F, 14.5F, 24.5F, 34.5F, 44.5F, 54.5F, 64.5F, 74.5F, 84.5F, 94.5F};
+    double[] ANSWER2 = new double[] {0, 10, 20, 30, 40, 50, 60, 70, 80, 90};
+    long[] ANSWER3 = new long[] {9, 19, 29, 39, 49, 59, 69, 79, 89, 99};
+    long[] ANSWER4 = new long[] {45, 145, 245, 345, 445, 545, 645, 745, 845, 945};
+    int[] ANSWER5 = new int[] {-9, -19, -29, -39, -49, -59, -69, -79, -89, -99};
+    double[] ANSWER6 = new double[] {8.25, 8.25, 8.25, 8.25, 8.25, 8.25, 8.25, 8.25, 8.25, 8.25};
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (int i = 0; i < 100; i++) {
+        statement.execute(SQL_FOR_SAMPLE_S1[i]);
+        statement.execute(SQL_FOR_SAMPLE_S2[i]);
+        statement.execute(SQL_FOR_SAMPLE_S3[i]);
+        statement.execute(SQL_FOR_SAMPLE_S4[i]);
+        statement.execute(SQL_FOR_SAMPLE_S5[i]);
+        statement.execute(SQL_FOR_SAMPLE_S6[i]);
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      String functionName = "EQUAL_SIZE_BUCKET_AGG_SAMPLE";
+      double proportionValue = 0.1;
+      ResultSet resultSet =
+          statement.executeQuery(
+              String.format(
+                  "select "
+                      + "%s(s1, 'proportion'='%f'), "
+                      + "%s(s2, 'type'='%s', 'proportion'='%f'), "
+                      + "%s(s3, 'type'='%s', 'proportion'='%f'), "
+                      + "%s(s4, 'type'='%s', 'proportion'='%f'), "
+                      + "%s(s5, 'type'='%s', 'proportion'='%f'), "
+                      + "%s(s6, 'type'='%s', 'proportion'='%f')"
+                      + "from root.sg.d4",
+                  functionName,
+                  proportionValue,
+                  functionName,
+                  "min",
+                  proportionValue,
+                  functionName,
+                  "max",
+                  proportionValue,
+                  functionName,
+                  "sum",
+                  proportionValue,
+                  functionName,
+                  "extreme",
+                  proportionValue,
+                  functionName,
+                  "variance",
+                  proportionValue));
+      int columnCount = resultSet.getMetaData().getColumnCount();
+      assertEquals(1 + 6, columnCount);
+      for (int i = 0; i < 10; i++) {
+        resultSet.next();
+        assertEquals(ANSWER1[i], resultSet.getDouble(2), 0.01);
+        assertEquals(ANSWER2[i], resultSet.getDouble(3), 0.01);
+        assertEquals(ANSWER3[i], resultSet.getLong(4));
+        assertEquals(ANSWER4[i], resultSet.getLong(5));
+        assertEquals(ANSWER5[i], resultSet.getInt(6));
+        assertEquals(ANSWER6[i], resultSet.getDouble(7), 0.01);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testEqualBucketSampleForM4() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE TIMESERIES root.sg.d3.s1 with datatype=INT32,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+    String[] SQL_FOR_SAMPLE = new String[100];
+    for (int i = 0; i < 100; i++) {
+      SQL_FOR_SAMPLE[i] =
+          String.format("insert into root.sg.d3(time, s1) values (%d, %d)", i, i + 1);
+    }
+    int[] ANSWER1 = new int[] {1, 2, 39, 40, 41, 42, 79, 80, 81, 82, 99, 100};
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (String dataGenerationSql : SQL_FOR_SAMPLE) {
+        statement.execute(dataGenerationSql);
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      String functionName = "EQUAL_SIZE_BUCKET_M4_SAMPLE";
+      String methodName = "m4";
+      double proportionValue = 0.1;
+      ResultSet resultSet =
+          statement.executeQuery(
+              String.format(
+                  "select %s(s1, 'method'='%s', 'proportion'='%f') from root.sg.d3",
+                  functionName, methodName, proportionValue));
+      int columnCount = resultSet.getMetaData().getColumnCount();
+      assertEquals(1 + 1, columnCount);
+      for (int j : ANSWER1) {
+        resultSet.next();
+        assertEquals(j, resultSet.getInt(2));
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
index d4111a54b6..e54e86a821 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
@@ -60,7 +60,10 @@ public enum BuiltinFunction {
   NON_ZERO_DURATION("NON_ZERO_DURATION", UDTFNonZeroDuration.class),
   ZERO_COUNT("ZERO_COUNT", UDTFZeroCount.class),
   NON_ZERO_COUNT("NON_ZERO_COUNT", UDTFNonZeroCount.class),
-  ;
+  EQUAL_SIZE_BUCKET_RANDOM_SAMPLE(
+      "EQUAL_SIZE_BUCKET_RANDOM_SAMPLE", UDTFEqualSizeBucketRandomSample.class),
+  EQUAL_SIZE_BUCKET_AGG_SAMPLE("EQUAL_SIZE_BUCKET_AGG_SAMPLE", UDTFEqualSizeBucketAggSample.class),
+  EQUAL_SIZE_BUCKET_M4_SAMPLE("EQUAL_SIZE_BUCKET_M4_SAMPLE", UDTFEqualSizeBucketM4Sample.class);
 
   private final String functionName;
   private final Class<?> functionClass;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketAggSample.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketAggSample.java
new file mode 100644
index 0000000000..6ea16ae606
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketAggSample.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.builtin;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.exception.UDFException;
+import org.apache.iotdb.db.query.udf.api.exception.UDFInputSeriesDataTypeNotValidException;
+import org.apache.iotdb.db.query.udf.api.exception.UDFParameterNotValidException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+
+public class UDTFEqualSizeBucketAggSample extends UDTFEqualSizeBucketSample {
+
+  private String aggMethodType;
+  private Aggregator aggregator;
+
+  private interface Aggregator {
+
+    void aggregateInt(RowWindow rowWindow, PointCollector collector) throws IOException;
+
+    void aggregateLong(RowWindow rowWindow, PointCollector collector) throws IOException;
+
+    void aggregateFloat(RowWindow rowWindow, PointCollector collector) throws IOException;
+
+    void aggregateDouble(RowWindow rowWindow, PointCollector collector) throws IOException;
+  }
+
+  private static class AvgAggregator implements Aggregator {
+    @Override
+    public void aggregateInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        sum += rowWindow.getRow(i).getInt(0);
+      }
+      collector.putDouble(time, sum / windowSize);
+    }
+
+    @Override
+    public void aggregateLong(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        sum += rowWindow.getRow(i).getLong(0);
+      }
+      collector.putDouble(time, sum / windowSize);
+    }
+
+    @Override
+    public void aggregateFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        sum += rowWindow.getRow(i).getFloat(0);
+      }
+      collector.putDouble(time, sum / windowSize);
+    }
+
+    @Override
+    public void aggregateDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        sum += rowWindow.getRow(i).getDouble(0);
+      }
+      collector.putDouble(time, sum / windowSize);
+    }
+  }
+
+  private static class MinAggregator implements Aggregator {
+    @Override
+    public void aggregateInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      int minValue = rowWindow.getRow(0).getInt(0);
+      for (int i = 1; i < windowSize; i++) {
+        int value = rowWindow.getRow(i).getInt(0);
+        if (minValue > value) {
+          minValue = value;
+        }
+      }
+      collector.putInt(time, minValue);
+    }
+
+    @Override
+    public void aggregateLong(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      long minValue = rowWindow.getRow(0).getLong(0);
+      for (int i = 1; i < windowSize; i++) {
+        long value = rowWindow.getRow(i).getLong(0);
+        if (minValue > value) {
+          minValue = value;
+        }
+      }
+      collector.putLong(time, minValue);
+    }
+
+    @Override
+    public void aggregateFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      float minValue = rowWindow.getRow(0).getFloat(0);
+      for (int i = 1; i < windowSize; i++) {
+        float value = rowWindow.getRow(i).getFloat(0);
+        if (minValue > value) {
+          minValue = value;
+        }
+      }
+      collector.putFloat(time, minValue);
+    }
+
+    @Override
+    public void aggregateDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double minValue = rowWindow.getRow(0).getDouble(0);
+      for (int i = 1; i < windowSize; i++) {
+        double value = rowWindow.getRow(i).getDouble(0);
+        if (minValue > value) {
+          minValue = value;
+        }
+      }
+      collector.putDouble(time, minValue);
+    }
+  }
+
+  private static class MaxAggregator implements Aggregator {
+    @Override
+    public void aggregateInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      int maxValue = rowWindow.getRow(0).getInt(0);
+      for (int i = 1; i < windowSize; i++) {
+        int value = rowWindow.getRow(i).getInt(0);
+        if (maxValue < value) {
+          maxValue = value;
+        }
+      }
+      collector.putInt(time, maxValue);
+    }
+
+    @Override
+    public void aggregateLong(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      long maxValue = rowWindow.getRow(0).getLong(0);
+      for (int i = 1; i < windowSize; i++) {
+        long value = rowWindow.getRow(i).getLong(0);
+        if (maxValue < value) {
+          maxValue = value;
+        }
+      }
+      collector.putLong(time, maxValue);
+    }
+
+    @Override
+    public void aggregateFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      float maxValue = rowWindow.getRow(0).getFloat(0);
+      for (int i = 1; i < windowSize; i++) {
+        float value = rowWindow.getRow(i).getFloat(0);
+        if (maxValue < value) {
+          maxValue = value;
+        }
+      }
+      collector.putFloat(time, maxValue);
+    }
+
+    @Override
+    public void aggregateDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double maxValue = rowWindow.getRow(0).getDouble(0);
+      for (int i = 1; i < windowSize; i++) {
+        double value = rowWindow.getRow(i).getDouble(0);
+        if (maxValue < value) {
+          maxValue = value;
+        }
+      }
+      collector.putDouble(time, maxValue);
+    }
+  }
+
+  private static class SumAggregator implements Aggregator {
+    @Override
+    public void aggregateInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      int sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        sum += rowWindow.getRow(i).getInt(0);
+      }
+      collector.putInt(time, sum);
+    }
+
+    @Override
+    public void aggregateLong(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      long sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        sum += rowWindow.getRow(i).getLong(0);
+      }
+      collector.putLong(time, sum);
+    }
+
+    @Override
+    public void aggregateFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      float sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        sum += rowWindow.getRow(i).getFloat(0);
+      }
+      collector.putFloat(time, sum);
+    }
+
+    @Override
+    public void aggregateDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        sum += rowWindow.getRow(i).getDouble(0);
+      }
+      collector.putDouble(time, sum);
+    }
+  }
+
+  private static class ExtremeAggregator implements Aggregator {
+    @Override
+    public void aggregateInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      int extreme = 0;
+      for (int i = 0; i < windowSize; i++) {
+        int origin = rowWindow.getRow(i).getInt(0);
+        int value = origin > 0 ? origin : -origin;
+        if (extreme < value) {
+          extreme = origin;
+        }
+      }
+      collector.putInt(time, extreme);
+    }
+
+    @Override
+    public void aggregateLong(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      long extreme = 0;
+      for (int i = 0; i < windowSize; i++) {
+        long origin = rowWindow.getRow(i).getLong(0);
+        long value = origin > 0 ? origin : -origin;
+        if (extreme < value) {
+          extreme = origin;
+        }
+      }
+      collector.putLong(time, extreme);
+    }
+
+    @Override
+    public void aggregateFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      float extreme = 0;
+      for (int i = 0; i < windowSize; i++) {
+        float origin = rowWindow.getRow(i).getFloat(0);
+        float value = origin > 0 ? origin : -origin;
+        if (extreme < value) {
+          extreme = origin;
+        }
+      }
+      collector.putFloat(time, extreme);
+    }
+
+    @Override
+    public void aggregateDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double extreme = 0;
+      for (int i = 0; i < windowSize; i++) {
+        double origin = rowWindow.getRow(i).getDouble(0);
+        double value = origin > 0 ? origin : -origin;
+        if (extreme < value) {
+          extreme = origin;
+        }
+      }
+      collector.putDouble(time, extreme);
+    }
+  }
+
+  private static class VarianceAggregator implements Aggregator {
+    @Override
+    public void aggregateInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double avg = 0, sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        avg += rowWindow.getRow(i).getInt(0);
+      }
+      avg /= windowSize;
+      for (int i = 0; i < windowSize; i++) {
+        double delta = rowWindow.getRow(i).getInt(0) - avg;
+        sum += delta * delta;
+      }
+      collector.putDouble(time, sum / windowSize);
+    }
+
+    @Override
+    public void aggregateLong(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double avg = 0, sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        avg += rowWindow.getRow(i).getLong(0);
+      }
+      avg /= windowSize;
+      for (int i = 0; i < windowSize; i++) {
+        double delta = rowWindow.getRow(i).getLong(0) - avg;
+        sum += delta * delta;
+      }
+      collector.putDouble(time, sum / windowSize);
+    }
+
+    @Override
+    public void aggregateFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double avg = 0, sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        avg += rowWindow.getRow(i).getFloat(0);
+      }
+      avg /= windowSize;
+      for (int i = 0; i < windowSize; i++) {
+        double delta = rowWindow.getRow(i).getFloat(0) - avg;
+        sum += delta * delta;
+      }
+      collector.putDouble(time, sum / windowSize);
+    }
+
+    @Override
+    public void aggregateDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
+      long time = rowWindow.getRow(0).getTime();
+      int windowSize = rowWindow.windowSize();
+
+      double avg = 0, sum = 0;
+      for (int i = 0; i < windowSize; i++) {
+        avg += rowWindow.getRow(i).getDouble(0);
+      }
+      avg /= windowSize;
+      for (int i = 0; i < windowSize; i++) {
+        double delta = rowWindow.getRow(i).getDouble(0) - avg;
+        sum += delta * delta;
+      }
+      collector.putDouble(time, sum / windowSize);
+    }
+  }
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws MetadataException, UDFException {
+    super.validate(validator);
+    aggMethodType = validator.getParameters().getStringOrDefault("type", "avg").toLowerCase();
+    validator.validate(
+        type ->
+            "avg".equals(type)
+                || "max".equals(type)
+                || "min".equals(type)
+                || "sum".equals(type)
+                || "extreme".equals(type)
+                || "variance".equals(type),
+        "Illegal aggregation method. Aggregation type should be avg, min, max, sum, extreme, variance.",
+        aggMethodType);
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws UDFParameterNotValidException {
+    // if we use aggregation method on average or variance, the outputDataType may be double.
+    // For other scenarios, outputDataType == dataType
+    TSDataType outputDataType = dataType;
+    if ("avg".equals(aggMethodType) || "variance".equals(aggMethodType)) {
+      outputDataType = TSDataType.DOUBLE;
+    }
+    configurations
+        .setAccessStrategy(new SlidingSizeWindowAccessStrategy(bucketSize))
+        .setOutputDataType(outputDataType);
+    switch (aggMethodType) {
+      case "avg":
+        aggregator = new AvgAggregator();
+        break;
+      case "min":
+        aggregator = new MinAggregator();
+        break;
+      case "max":
+        aggregator = new MaxAggregator();
+        break;
+      case "sum":
+        aggregator = new SumAggregator();
+        break;
+      case "extreme":
+        aggregator = new ExtremeAggregator();
+        break;
+      case "variance":
+        aggregator = new VarianceAggregator();
+        break;
+      default:
+        throw new UDFParameterNotValidException(
+            "Illegal aggregation method. Aggregation type should be avg, min, max, sum, extreme, variance.");
+    }
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector)
+      throws IOException, UDFParameterNotValidException {
+    switch (dataType) {
+      case INT32:
+        aggregator.aggregateInt(rowWindow, collector);
+        break;
+      case INT64:
+        aggregator.aggregateLong(rowWindow, collector);
+        break;
+      case FLOAT:
+        aggregator.aggregateFloat(rowWindow, collector);
+        break;
+      case DOUBLE:
+        aggregator.aggregateDouble(rowWindow, collector);
+        break;
+      default:
+        // This will not happen
+        throw new UDFInputSeriesDataTypeNotValidException(
+            0, dataType, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketM4Sample.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketM4Sample.java
new file mode 100644
index 0000000000..ee6b0c59c0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketM4Sample.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.builtin;
+
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.exception.UDFException;
+import org.apache.iotdb.db.query.udf.api.exception.UDFInputSeriesDataTypeNotValidException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+
+public class UDTFEqualSizeBucketM4Sample extends UDTFEqualSizeBucketSample {
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) {
+    bucketSize *= 4;
+    configurations
+        .setAccessStrategy(new SlidingSizeWindowAccessStrategy(bucketSize))
+        .setOutputDataType(dataType);
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector)
+      throws UDFException, IOException {
+    switch (dataType) {
+      case INT32:
+        transformInt(rowWindow, collector);
+        break;
+      case INT64:
+        transformLong(rowWindow, collector);
+        break;
+      case FLOAT:
+        transformFloat(rowWindow, collector);
+        break;
+      case DOUBLE:
+        transformDouble(rowWindow, collector);
+        break;
+      default:
+        // This will not happen
+        throw new UDFInputSeriesDataTypeNotValidException(
+            0, dataType, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE);
+    }
+  }
+
+  public void transformInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+    if (rowWindow.windowSize() < 4) {
+      for (int i = 0; i < rowWindow.windowSize(); i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putInt(row.getTime(), row.getInt(0));
+      }
+      return;
+    }
+
+    int minIndex = 1, maxIndex = 1;
+    int maxValue = rowWindow.getRow(1).getInt(0);
+    int minValue = rowWindow.getRow(1).getInt(0);
+    for (int i = 2; i < rowWindow.windowSize() - 1; i++) {
+      int value = rowWindow.getRow(i).getInt(0);
+      if (minValue > value) {
+        minValue = value;
+        minIndex = i;
+      }
+      if (maxValue < value) {
+        maxValue = value;
+        maxIndex = i;
+      }
+    }
+    if (minIndex == maxIndex) {
+      maxIndex = rowWindow.windowSize() - 2;
+    }
+
+    Row row = rowWindow.getRow(0);
+    collector.putInt(row.getTime(), row.getInt(0));
+    if (maxIndex < minIndex) {
+      row = rowWindow.getRow(maxIndex);
+      collector.putInt(row.getTime(), row.getInt(0));
+      row = rowWindow.getRow(minIndex);
+    } else {
+      row = rowWindow.getRow(minIndex);
+      collector.putInt(row.getTime(), row.getInt(0));
+      row = rowWindow.getRow(maxIndex);
+    }
+    collector.putInt(row.getTime(), row.getInt(0));
+    row = rowWindow.getRow(rowWindow.windowSize() - 1);
+    collector.putInt(row.getTime(), row.getInt(0));
+  }
+
+  public void transformLong(RowWindow rowWindow, PointCollector collector) throws IOException {
+    if (rowWindow.windowSize() < 4) {
+      for (int i = 0; i < rowWindow.windowSize(); i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putLong(row.getTime(), row.getLong(0));
+      }
+      return;
+    }
+
+    int minIndex = 1, maxIndex = 1;
+    long maxValue = rowWindow.getRow(1).getLong(0);
+    long minValue = rowWindow.getRow(1).getLong(0);
+    for (int i = 2; i < rowWindow.windowSize() - 1; i++) {
+      long value = rowWindow.getRow(i).getLong(0);
+      if (minValue > value) {
+        minValue = value;
+        minIndex = i;
+      }
+      if (maxValue < value) {
+        maxValue = value;
+        maxIndex = i;
+      }
+    }
+    if (minIndex == maxIndex) {
+      maxIndex = rowWindow.windowSize() - 2;
+    }
+
+    Row row = rowWindow.getRow(0);
+    collector.putLong(row.getTime(), row.getLong(0));
+    if (maxIndex < minIndex) {
+      row = rowWindow.getRow(maxIndex);
+      collector.putLong(row.getTime(), row.getLong(0));
+      row = rowWindow.getRow(minIndex);
+    } else {
+      row = rowWindow.getRow(minIndex);
+      collector.putLong(row.getTime(), row.getLong(0));
+      row = rowWindow.getRow(maxIndex);
+    }
+    collector.putLong(row.getTime(), row.getLong(0));
+    row = rowWindow.getRow(rowWindow.windowSize() - 1);
+    collector.putLong(row.getTime(), row.getLong(0));
+  }
+
+  public void transformFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
+    if (rowWindow.windowSize() < 4) {
+      for (int i = 0; i < rowWindow.windowSize(); i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putFloat(row.getTime(), row.getFloat(0));
+      }
+      return;
+    }
+
+    int minIndex = 1, maxIndex = 1;
+    float maxValue = rowWindow.getRow(1).getFloat(0);
+    float minValue = rowWindow.getRow(1).getFloat(0);
+    for (int i = 2; i < rowWindow.windowSize() - 1; i++) {
+      float value = rowWindow.getRow(i).getFloat(0);
+      if (minValue > value) {
+        minValue = value;
+        minIndex = i;
+      }
+      if (maxValue < value) {
+        maxValue = value;
+        maxIndex = i;
+      }
+    }
+    if (minIndex == maxIndex) {
+      maxIndex = rowWindow.windowSize() - 2;
+    }
+
+    Row row = rowWindow.getRow(0);
+    collector.putFloat(row.getTime(), row.getFloat(0));
+    if (maxIndex < minIndex) {
+      row = rowWindow.getRow(maxIndex);
+      collector.putFloat(row.getTime(), row.getFloat(0));
+      row = rowWindow.getRow(minIndex);
+    } else {
+      row = rowWindow.getRow(minIndex);
+      collector.putFloat(row.getTime(), row.getFloat(0));
+      row = rowWindow.getRow(maxIndex);
+    }
+    collector.putFloat(row.getTime(), row.getFloat(0));
+    row = rowWindow.getRow(rowWindow.windowSize() - 1);
+    collector.putFloat(row.getTime(), row.getFloat(0));
+  }
+
+  public void transformDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
+    if (rowWindow.windowSize() < 4) {
+      for (int i = 0; i < rowWindow.windowSize(); i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putDouble(row.getTime(), row.getDouble(0));
+      }
+      return;
+    }
+
+    int minIndex = 1, maxIndex = 1;
+    double maxValue = rowWindow.getRow(1).getDouble(0);
+    double minValue = rowWindow.getRow(1).getDouble(0);
+    for (int i = 2; i < rowWindow.windowSize() - 1; i++) {
+      double value = rowWindow.getRow(i).getDouble(0);
+      if (minValue > value) {
+        minValue = value;
+        minIndex = i;
+      }
+      if (maxValue < value) {
+        maxValue = value;
+        maxIndex = i;
+      }
+    }
+    if (minIndex == maxIndex) {
+      maxIndex = rowWindow.windowSize() - 2;
+    }
+
+    Row row = rowWindow.getRow(0);
+    collector.putDouble(row.getTime(), row.getDouble(0));
+    if (maxIndex < minIndex) {
+      row = rowWindow.getRow(maxIndex);
+      collector.putDouble(row.getTime(), row.getDouble(0));
+      row = rowWindow.getRow(minIndex);
+    } else {
+      row = rowWindow.getRow(minIndex);
+      collector.putDouble(row.getTime(), row.getDouble(0));
+      row = rowWindow.getRow(maxIndex);
+    }
+    collector.putDouble(row.getTime(), row.getDouble(0));
+    row = rowWindow.getRow(rowWindow.windowSize() - 1);
+    collector.putDouble(row.getTime(), row.getDouble(0));
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketRandomSample.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketRandomSample.java
new file mode 100644
index 0000000000..3b45f194c8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketRandomSample.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.builtin;
+
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.exception.UDFInputSeriesDataTypeNotValidException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+import java.util.Random;
+
+public class UDTFEqualSizeBucketRandomSample extends UDTFEqualSizeBucketSample {
+
+  private Random random;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) {
+    random = new Random();
+    configurations
+        .setAccessStrategy(new SlidingSizeWindowAccessStrategy(bucketSize))
+        .setOutputDataType(dataType);
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector)
+      throws IOException, UDFInputSeriesDataTypeNotValidException {
+    Row row = rowWindow.getRow(random.nextInt(rowWindow.windowSize()));
+    switch (dataType) {
+      case INT32:
+        collector.putInt(row.getTime(), row.getInt(0));
+        break;
+      case INT64:
+        collector.putLong(row.getTime(), row.getLong(0));
+        break;
+      case FLOAT:
+        collector.putFloat(row.getTime(), row.getFloat(0));
+        break;
+      case DOUBLE:
+        collector.putDouble(row.getTime(), row.getDouble(0));
+      default:
+        // This will not happen
+        throw new UDFInputSeriesDataTypeNotValidException(
+            0, dataType, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketSample.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketSample.java
new file mode 100644
index 0000000000..3d554140cb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketSample.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.builtin;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.exception.UDFException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public abstract class UDTFEqualSizeBucketSample implements UDTF {
+
+  protected TSDataType dataType;
+  protected double proportion;
+  protected int bucketSize;
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws MetadataException, UDFException {
+    proportion = validator.getParameters().getDoubleOrDefault("proportion", 0.1);
+    validator
+        .validateInputSeriesNumber(1)
+        .validateInputSeriesDataType(
+            0, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE)
+        .validate(
+            proportion -> (double) proportion > 0 && (double) proportion <= 1,
+            "Illegal sample proportion. proportion > 0 and proportion <= 1",
+            proportion);
+    dataType = validator.getParameters().getDataType(0);
+    bucketSize = (int) (1 / proportion);
+  }
+}