You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/02/21 01:57:44 UTC

[iotdb] branch master updated: [IOTDB-4438] Add session window in RawDataAggregationOperator window management framework

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ace6afa08 [IOTDB-4438] Add session window in RawDataAggregationOperator window management framework
6ace6afa08 is described below

commit 6ace6afa086bd5d4622d50c1a68408ba8f5e83b2
Author: AACEPT <34...@users.noreply.github.com>
AuthorDate: Tue Feb 21 09:57:38 2023 +0800

    [IOTDB-4438] Add session window in RawDataAggregationOperator window management framework
---
 .../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 |   1 +
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |   1 +
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   4 +
 docs/UserGuide/Query-Data/Align-By.md              |   2 +-
 docs/UserGuide/Query-Data/Group-By.md              |  82 +++-
 docs/UserGuide/Query-Data/Overview.md              |   5 +-
 docs/zh/UserGuide/Query-Data/Group-By.md           | 120 ++++-
 docs/zh/UserGuide/Query-Data/Overview.md           |   5 +-
 .../iotdb/db/it/groupby/IoTDBGroupBySessionIT.java | 509 +++++++++++++++++++++
 .../codegen/templates/evEventWindowManager.ftl     |   6 +-
 .../process/RawDataAggregationOperator.java        |   4 +-
 .../execution/operator/window/IWindowManager.java  |   5 +-
 .../execution/operator/window/SeriesWindow.java    |   4 +
 .../operator/window/SeriesWindowManager.java       |   3 +
 .../execution/operator/window/SessionWindow.java   | 157 +++++++
 ...indowManager.java => SessionWindowManager.java} | 118 +++--
 ...WindowType.java => SessionWindowParameter.java} |  17 +-
 .../operator/window/WindowManagerFactory.java      |  13 +-
 .../mpp/execution/operator/window/WindowType.java  |   3 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  13 +-
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  24 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  15 +-
 .../planner/plan/parameter/GroupByParameter.java   |  17 +-
 .../plan/parameter/GroupBySeriesParameter.java     |  15 +-
 ...Parameter.java => GroupBySessionParameter.java} |  35 +-
 .../plan/parameter/GroupByVariationParameter.java  |  20 +-
 .../component/GroupBySessionComponent.java}        |  20 +-
 .../db/mpp/plan/statement/crud/QueryStatement.java |   3 +
 .../operator/RawDataAggregationOperatorTest.java   |  59 +++
 29 files changed, 1107 insertions(+), 173 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index 7358eb54f6..0a620a8e99 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -156,6 +156,7 @@ keyWords
     | SELECT
     | SERIES
     | SERIESSLOTID
+    | SESSION
     | SET
     | SETTLE
     | SGLEVEL
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index f213a4187b..4359ec507c 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -482,6 +482,7 @@ groupByAttributeClause
     | TAGS LR_BRACKET identifier (COMMA identifier)* RR_BRACKET
     | VARIATION LR_BRACKET expression (COMMA delta=number)? (COMMA attributePair)? RR_BRACKET
     | SERIES LR_BRACKET expression (COMMA expression)? (COMMA attributePair)? RR_BRACKET
+    | SESSION LR_BRACKET timeInterval=DURATION_LITERAL RR_BRACKET
     ;
 
 number
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 7f4d4e496d..0b1a1ca6bc 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -562,6 +562,10 @@ SERIESSLOTID
     : S E R I E S S L O T I D
     ;
 
+SESSION
+    : S E S S I O N
+    ;
+
 SET
     : S E T
     ;
diff --git a/docs/UserGuide/Query-Data/Align-By.md b/docs/UserGuide/Query-Data/Align-By.md
index c7e32c5d2d..ae117d2074 100644
--- a/docs/UserGuide/Query-Data/Align-By.md
+++ b/docs/UserGuide/Query-Data/Align-By.md
@@ -55,7 +55,7 @@ The result shows below:
 Total line number = 6
 It costs 0.012s
 ```
-### ordering in ALIGN BY DEVICE
+## Ordering in ALIGN BY DEVICE
 
 ALIGN BY DEVICE mode arranges according to the device first, and sort each device in ascending order according to the timestamp. The ordering and priority can be adjusted through `ORDER BY` clause.
 
diff --git a/docs/UserGuide/Query-Data/Group-By.md b/docs/UserGuide/Query-Data/Group-By.md
index f081b4af4e..a817b0afc9 100644
--- a/docs/UserGuide/Query-Data/Group-By.md
+++ b/docs/UserGuide/Query-Data/Group-By.md
@@ -744,7 +744,7 @@ For the following raw data, several query examples are given below:
 +-----------------------------+-------------------------+-------------------------------------+------------------------------------+
 ```
 The sql statement to query data with at least two continuous row shown below: 
-```
+```sql
 select __endTime,max_time(charging_status),count(vehicle_status),last_value(soc) from root.** group by series(charging_status=1,KEEP>=2,ignoringNull=true)
 ```
 Get the result below:
@@ -757,7 +757,7 @@ Get the result below:
 +-----------------------------+---------+-----------------------------------------------+-------------------------------------------+-------------------------------------+
 ```
 When ignoreNull is false, the null value will be treated as a row that doesn't meet the condition.
-```
+```sql
 select __endTime,max_time(charging_status),count(vehicle_status),last_value(soc) from root.** group by series(charging_status=1,KEEP>=2,ignoringNull=false)
 ```
 Get the result below, the original group is split.
@@ -769,4 +769,82 @@ Get the result below, the original group is split.
 |1970-01-01T08:00:00.005+08:00|        7|                                              7|                                          3|                                 36.0|
 |1970-01-01T08:00:00.009+08:00|       10|                                             10|                                          2|                                 60.0|
 +-----------------------------+---------+-----------------------------------------------+-------------------------------------------+-------------------------------------+
+```
+
+## Aggregation By Session
+`GROUP BY SESSION` can be used to group data according to the interval of the time. Data with a time interval less than or equal to the given threshold will be assigned to the same group.
+For example, in industrial scenarios, devices don't always run continuously, `GROUP BY SESSION` will group the data generated by each access session of the device. 
+Its syntax is defined as follows:
+```sql
+group by session(timeInterval)
+```
+* timeInterval
+
+A given interval threshold to create a new group of data when the difference between the time of data is greater than the threshold.
+
+The figure below is a grouping diagram under `GROUP BY SESSION`.
+
+<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://raw.githubusercontent.com/apache/iotdb-bin-resources/main/docs/UserGuide/Process-Data/GroupBy/SessionGroup.jpg">
+
+### Precautions for Use
+1. For a group in resultSet, the time column output the start time of the group by defalut. __endTime can be used in select clause to output the endTime of groups in resultSet.
+2. Each device is grouped separately when used with `ALIGN BY DEVICE`.
+
+For the raw data below, a few query examples are given:
+```
++-----------------------------+-----------------+-----------+--------+------+
+|                         Time|           Device|temperature|hardware|status|
++-----------------------------+-----------------+-----------+--------+------+
+|1970-01-01T08:00:01.000+08:00|root.ln.wf02.wt01|       35.7|      11| false|
+|1970-01-01T08:00:02.000+08:00|root.ln.wf02.wt01|       35.8|      22|  true|
+|1970-01-01T08:00:03.000+08:00|root.ln.wf02.wt01|       35.4|      33| false|
+|1970-01-01T08:00:04.000+08:00|root.ln.wf02.wt01|       36.4|      44| false|
+|1970-01-01T08:00:05.000+08:00|root.ln.wf02.wt01|       36.8|      55| false|
+|1970-01-01T08:00:10.000+08:00|root.ln.wf02.wt01|       36.8|     110| false|
+|1970-01-01T08:00:20.000+08:00|root.ln.wf02.wt01|       37.8|     220|  true|
+|1970-01-01T08:00:30.000+08:00|root.ln.wf02.wt01|       37.5|     330| false|
+|1970-01-01T08:00:40.000+08:00|root.ln.wf02.wt01|       37.4|     440| false|
+|1970-01-01T08:00:50.000+08:00|root.ln.wf02.wt01|       37.9|     550| false|
+|1970-01-01T08:01:40.000+08:00|root.ln.wf02.wt01|       38.0|     110| false|
+|1970-01-01T08:02:30.000+08:00|root.ln.wf02.wt01|       38.8|     220|  true|
+|1970-01-01T08:03:20.000+08:00|root.ln.wf02.wt01|       38.6|     330| false|
+|1970-01-01T08:04:20.000+08:00|root.ln.wf02.wt01|       38.4|     440| false|
+|1970-01-01T08:05:20.000+08:00|root.ln.wf02.wt01|       38.3|     550| false|
+|1970-01-01T08:06:40.000+08:00|root.ln.wf02.wt01|       null|       0|  null|
+|1970-01-01T08:07:50.000+08:00|root.ln.wf02.wt01|       null|       0|  null|
+|1970-01-01T08:08:00.000+08:00|root.ln.wf02.wt01|       null|       0|  null|
+|1970-01-02T08:08:01.000+08:00|root.ln.wf02.wt01|       38.2|     110| false|
+|1970-01-02T08:08:02.000+08:00|root.ln.wf02.wt01|       37.5|     220|  true|
+|1970-01-02T08:08:03.000+08:00|root.ln.wf02.wt01|       37.4|     330| false|
+|1970-01-02T08:08:04.000+08:00|root.ln.wf02.wt01|       36.8|     440| false|
+|1970-01-02T08:08:05.000+08:00|root.ln.wf02.wt01|       37.4|     550| false|
++-----------------------------+-----------------+-----------+--------+------+
+```
+TimeInterval can be set by different time units, the sql is shown below:
+```sql
+select __endTime,count(*) from root.** group by session(1d)
+```
+Get the result:
+```
++-----------------------------+---------+------------------------------------+---------------------------------+-------------------------------+
+|                         Time|__endTime|count(root.ln.wf02.wt01.temperature)|count(root.ln.wf02.wt01.hardware)|count(root.ln.wf02.wt01.status)|
++-----------------------------+---------+------------------------------------+---------------------------------+-------------------------------+
+|1970-01-01T08:00:01.000+08:00|   480000|                                  15|                               18|                             15|
+|1970-01-02T08:08:01.000+08:00| 86885000|                                   5|                                5|                              5|
++-----------------------------+---------+------------------------------------+---------------------------------+-------------------------------+
+```
+It can be also used with `HAVING` and `ALIGN BY DEVICE` clauses.
+```sql
+select __endTime,sum(hardware) from root.ln.wf02.wt01 group by session(50s) align by device
+```
+Get the result below:
+```
++-----------------------------+-----------------+---------+-------------+
+|                         Time|           Device|__endTime|sum(hardware)|
++-----------------------------+-----------------+---------+-------------+
+|1970-01-01T08:00:01.000+08:00|root.ln.wf02.wt01|   200000|       2475.0|
+|1970-01-01T08:04:20.000+08:00|root.ln.wf02.wt01|   260000|        440.0|
+|1970-01-01T08:05:20.000+08:00|root.ln.wf02.wt01|   320000|        550.0|
+|1970-01-02T08:08:01.000+08:00|root.ln.wf02.wt01| 86885000|       1650.0|
++-----------------------------+-----------------+---------+-------------+
 ```
\ No newline at end of file
diff --git a/docs/UserGuide/Query-Data/Overview.md b/docs/UserGuide/Query-Data/Overview.md
index 0839e73d2e..3cb72cb0f6 100644
--- a/docs/UserGuide/Query-Data/Overview.md
+++ b/docs/UserGuide/Query-Data/Overview.md
@@ -35,7 +35,8 @@ SELECT [LAST] selectExpr [, selectExpr] ...
         LEVEL = levelNum [, levelNum] ... |
         TAGS(tagKey [, tagKey] ... )
         VARIATION(expression[,delta][,ignoreNull=true/false])|
-        SERIES(expression,[keep>/>=/=/</<=]threshold[,ignoreNull=true/false])
+        SERIES(expression,[keep>/>=/=/</<=]threshold[,ignoreNull=true/false])|
+        SESSION(timeInterval)
     }]
     [HAVING havingCondition]
     [ORDER BY sortKey {ASC | DESC}]
@@ -75,7 +76,7 @@ SELECT [LAST] selectExpr [, selectExpr] ...
 ### `GROUP BY` clause
 
 - The `GROUP BY` clause specifies how the time series are aggregated by segment or group.
-- Segmented aggregation refers to segmenting data in the row direction according to the time dimension, aiming at the time relationship between different data points in the same time series, and obtaining an aggregated value for each segment. Currently only **segmentation by time interval**、**group by variation** and **group by series** is supported, and more segmentation methods will be supported in the future.
+- Segmented aggregation refers to segmenting data in the row direction according to the time dimension, aiming at the time relationship between different data points in the same time series, and obtaining an aggregated value for each segment. Currently only **segmentation by time interval**、**group by variation**、**group by series** and **group by session** is supported, and more segmentation methods will be supported in the future.
 - Group aggregation refers to grouping the potential business attributes of time series for different time series. Each group contains several time series, and each group gets an aggregated value. Support **group by path level** and **group by tag** two grouping methods.
 - Segment aggregation and group aggregation can be mixed.
 - For details and examples, see the document [Group By Aggregation](./Group-By.md).
diff --git a/docs/zh/UserGuide/Query-Data/Group-By.md b/docs/zh/UserGuide/Query-Data/Group-By.md
index a8566c3277..e3b9f386ae 100644
--- a/docs/zh/UserGuide/Query-Data/Group-By.md
+++ b/docs/zh/UserGuide/Query-Data/Group-By.md
@@ -581,24 +581,24 @@ group by variation(controlExpression[,delta][,ignoreNull=true/false])
 分组所参照的值,可以是查询数据中的某一列或是多列的表达式。 
 * delta
 
-分组所使用的阈值,同一分组中每条数据expression对应的值与第一个的差值都小于delta。当delta=0时,相当于一个等值分组,所有连续且expression值相同的数据将被分到一组。
+分组所使用的阈值,同一分组中每条数据expression对应的值与第一个的差值都小于`delta`。当`delta=0`时,相当于一个等值分组,所有连续且expression值相同的数据将被分到一组。
 
 * ignoreNull
 
-用于指定controlExpression的计算返回值为null时对数据的处理方式,当ignoreNull为false时,null值会被视为新的值,ignoreNull为true时,则直接跳过对应的数据。
+用于指定`controlExpression`的计算返回值为null时对数据的处理方式,当`ignoreNull`为false时,null值会被视为新的值,`ignoreNull`为true时,则直接跳过对应的数据。
 
-在delta取不同值时,controlExpression支持的返回数据类型以及当ignoreNull=false对于null值的处理方式可以见下表:
+在`delta`取不同值时,`controlExpression`支持的返回数据类型以及当`ignoreNull`为false时对于null值的处理方式可以见下表:
 
-|delta| 支持的controlExpression的返回类型            | ignoreNull=false时对于Null值的处理                                     |
-|-----|--------------------------------------|-----------------------------------------------------------------|
-|delta!=0| INT32、INT64、FLOAT、DOUBLE             | 若正在维护分组的值不为null,null视为无穷大/无穷小,结束当前分组。连续的null视为相等稳定的值,会被分配在同一个分组 
-|delta=0| TEXT、BINARY、INT32、INT64、FLOAT、DOUBLE | null被视为新分组中的新值,连续的null属于相同的分组                                       
+| delta    | 支持的controlExpression的返回类型            | ignoreNull=false时对于Null值的处理                                     |
+|----------|--------------------------------------|-----------------------------------------------------------------|
+| delta!=0 | INT32、INT64、FLOAT、DOUBLE             | 若正在维护分组的值不为null,null视为无穷大/无穷小,结束当前分组。连续的null视为相等稳定的值,会被分配在同一个分组 |
+| delta=0  | TEXT、BINARY、INT32、INT64、FLOAT、DOUBLE | null被视为新分组中的新值,连续的null属于相同的分组                                   |
 
 ### 使用注意事项
-1. controlExpression的结果应该为唯一值,如果使用通配符拼接后出现多列,则报错。
-2. 对于一个分组,默认Time列输出分组的开始时间,查询时可以使用select __endTime的方式来使得结果输出分组的结束时间。
+1. `controlExpression`的结果应该为唯一值,如果使用通配符拼接后出现多列,则报错。
+2. 对于一个分组,默认Time列输出分组的开始时间,查询时可以使用select `__endTime`的方式来使得结果输出分组的结束时间。
 3. 与`ALIGN BY DEVICE`搭配使用时会对每个device进行单独的分组操作。
-4. 当没有指定delta和ignoreNull时,delta默认为0,ignoreNull默认为true。
+4. 当没有指定`delta`和`ignoreNull`时,`delta`默认为0,`ignoreNull`默认为true。
 5. 当前暂不支持与`GROUP BY LEVEL`搭配使用。
 
 使用如下的原始数据,接下来会给出几个事件分段查询的使用样例
@@ -621,7 +621,7 @@ group by variation(controlExpression[,delta][,ignoreNull=true/false])
 ```
 ### delta=0时的等值事件分段
 使用如下sql语句
-```
+```sql
 select __endTime, avg(s1), count(s2), sum(s3) from root.sg.d group by variation(s6)
 ```
 得到如下的查询结果,这里忽略了s6为null的行
@@ -636,7 +636,7 @@ select __endTime, avg(s1), count(s2), sum(s3) from root.sg.d group by variation(
 +-----------------------------+---------+-----------------+-------------------+-----------------+
 ```
 当指定ignoreNull为false时,会将s6为null的数据也考虑进来
-```
+```sql
 select __endTime, avg(s1), count(s2), sum(s3) from root.sg.d group by variation(s6,ignoreNull=false)
 ```
 得到如下的结果
@@ -655,7 +655,7 @@ select __endTime, avg(s1), count(s2), sum(s3) from root.sg.d group by variation(
 ```
 ### delta!=0时的差值事件分段
 使用如下sql语句
-```
+```sql
 select __endTime, avg(s1), count(s2), sum(s3) from root.sg.d group by variation(s6+, 4)
 ```
 得到如下的查询结果
@@ -670,7 +670,7 @@ select __endTime, avg(s1), count(s2), sum(s3) from root.sg.d group by variation(
 ```
 group by子句中的controlExpression同样支持列的表达式
 
-```
+```sql
 select __endTime, avg(s1), count(s2), sum(s3) from root.sg.d group by variation(s6+s5, 10)
 ```
 得到如下的查询结果
@@ -687,7 +687,7 @@ select __endTime, avg(s1), count(s2), sum(s3) from root.sg.d group by variation(
 ## 事件条件分段聚合
 当需要根据指定条件对数据行进行筛选,并将连续的数据分为一组进行聚合运算时,可以使用`GROUP BY SERIES`的分段方式;不满足的给定条件的行因为不属于任何分组会被直接简单忽略。
 其语法定义如下:
-```
+```sql
 group by series(predict,[keep>/>=/=/<=/<]threshold,[,ignoreNull=true/false])
 ```
 * predict
@@ -702,10 +702,10 @@ keep表达式用来指定形成分组所需要连续满足`predict`条件的数
 用于指定遇到predict为null的数据行时的处理方式,为true则跳过该行,为false则结束当前分组。
 
 ### 使用注意事项
-1. keep条件在查询中是必需的,但可以省略掉'keep'字符串给出一个常数,默认为keep=该常数的等于条件。
-2. ignoreNull默认为true。
-3. 对于一个分组,默认Time列输出分组的开始时间,查询时可以使用select __endTime的方式来使得结果输出分组的结束时间。
-4. 与`align by device`搭配使用时会对每个device进行单独的分组操作。
+1. keep条件在查询中是必需的,但可以省略掉'keep'字符串给出一个常数,默认为`keep=该常数`的等于条件。
+2. `ignoreNull`默认为true。
+3. 对于一个分组,默认Time列输出分组的开始时间,查询时可以使用select `__endTime`的方式来使得结果输出分组的结束时间。
+4. 与`ALIGN BY DEVICE`搭配使用时会对每个device进行单独的分组操作。
 
 
 对于如下原始数据,下面会给出几个查询样例:
@@ -726,7 +726,7 @@ keep表达式用来指定形成分组所需要连续满足`predict`条件的数
 +-----------------------------+-------------------------+-------------------------------------+------------------------------------+
 ```
 查询至少连续两行以上的charging_status=1的数据,sql语句如下:
-```
+```sql
 select __endTime,max_time(charging_status),count(vehicle_status),last_value(soc) from root.** group by series(charging_status=1,KEEP>=2,ignoringNull=true)
 ```
 得到结果如下:
@@ -738,8 +738,8 @@ select __endTime,max_time(charging_status),count(vehicle_status),last_value(soc)
 |1970-01-01T08:00:00.005+08:00|       10|                                             10|                                          5|                                 60.0|
 +-----------------------------+---------+-----------------------------------------------+-------------------------------------------+-------------------------------------+
 ```
-当设置ignoreNull为false时,遇到null值为将其视为一个不满足条件的行,会结束正在计算的分组。
-```
+当设置`ignoreNull`为false时,遇到null值为将其视为一个不满足条件的行,会结束正在计算的分组。
+```sql
 select __endTime,max_time(charging_status),count(vehicle_status),last_value(soc) from root.** group by series(charging_status=1,KEEP>=2,ignoringNull=false)
 ```
 得到如下结果,原先的分组被含null的行拆分:
@@ -751,4 +751,80 @@ select __endTime,max_time(charging_status),count(vehicle_status),last_value(soc)
 |1970-01-01T08:00:00.005+08:00|        7|                                              7|                                          3|                                 36.0|
 |1970-01-01T08:00:00.009+08:00|       10|                                             10|                                          2|                                 60.0|
 +-----------------------------+---------+-----------------------------------------------+-------------------------------------------+-------------------------------------+
+```
+## 会话分段聚合
+`GROUP BY SESSION`可以根据时间列的间隔进行分组,在结果集的时间列中,时间间隔小于等于设定阈值的数据会被分为一组。例如在工业场景中,设备并不总是连续运行,`GROUP BY SESSION`会将设备每次接入会话所产生的数据分为一组。
+其语法定义如下:
+```sql
+group by session(timeInterval)
+```
+* timeInterval
+
+设定的时间差阈值,当两条数据时间列的差值大于该阈值,则会给数据创建一个新的分组。
+
+下图为`group by session`下的一个分组示意图
+
+<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://raw.githubusercontent.com/apache/iotdb-bin-resources/main/docs/UserGuide/Process-Data/GroupBy/SessionGroup.jpg">
+
+### 使用注意事项
+1. 对于一个分组,默认Time列输出分组的开始时间,查询时可以使用select `__endTime`的方式来使得结果输出分组的结束时间。
+2. 与`ALIGN BY DEVICE`搭配使用时会对每个device进行单独的分组操作。
+
+对于下面的原始数据,给出几个查询样例。
+```
++-----------------------------+-----------------+-----------+--------+------+
+|                         Time|           Device|temperature|hardware|status|
++-----------------------------+-----------------+-----------+--------+------+
+|1970-01-01T08:00:01.000+08:00|root.ln.wf02.wt01|       35.7|      11| false|
+|1970-01-01T08:00:02.000+08:00|root.ln.wf02.wt01|       35.8|      22|  true|
+|1970-01-01T08:00:03.000+08:00|root.ln.wf02.wt01|       35.4|      33| false|
+|1970-01-01T08:00:04.000+08:00|root.ln.wf02.wt01|       36.4|      44| false|
+|1970-01-01T08:00:05.000+08:00|root.ln.wf02.wt01|       36.8|      55| false|
+|1970-01-01T08:00:10.000+08:00|root.ln.wf02.wt01|       36.8|     110| false|
+|1970-01-01T08:00:20.000+08:00|root.ln.wf02.wt01|       37.8|     220|  true|
+|1970-01-01T08:00:30.000+08:00|root.ln.wf02.wt01|       37.5|     330| false|
+|1970-01-01T08:00:40.000+08:00|root.ln.wf02.wt01|       37.4|     440| false|
+|1970-01-01T08:00:50.000+08:00|root.ln.wf02.wt01|       37.9|     550| false|
+|1970-01-01T08:01:40.000+08:00|root.ln.wf02.wt01|       38.0|     110| false|
+|1970-01-01T08:02:30.000+08:00|root.ln.wf02.wt01|       38.8|     220|  true|
+|1970-01-01T08:03:20.000+08:00|root.ln.wf02.wt01|       38.6|     330| false|
+|1970-01-01T08:04:20.000+08:00|root.ln.wf02.wt01|       38.4|     440| false|
+|1970-01-01T08:05:20.000+08:00|root.ln.wf02.wt01|       38.3|     550| false|
+|1970-01-01T08:06:40.000+08:00|root.ln.wf02.wt01|       null|       0|  null|
+|1970-01-01T08:07:50.000+08:00|root.ln.wf02.wt01|       null|       0|  null|
+|1970-01-01T08:08:00.000+08:00|root.ln.wf02.wt01|       null|       0|  null|
+|1970-01-02T08:08:01.000+08:00|root.ln.wf02.wt01|       38.2|     110| false|
+|1970-01-02T08:08:02.000+08:00|root.ln.wf02.wt01|       37.5|     220|  true|
+|1970-01-02T08:08:03.000+08:00|root.ln.wf02.wt01|       37.4|     330| false|
+|1970-01-02T08:08:04.000+08:00|root.ln.wf02.wt01|       36.8|     440| false|
+|1970-01-02T08:08:05.000+08:00|root.ln.wf02.wt01|       37.4|     550| false|
++-----------------------------+-----------------+-----------+--------+------+
+```
+可以按照不同的时间单位设定时间间隔,sql语句如下:
+```sql
+select __endTime,count(*) from root.** group by session(1d)
+```
+得到如下结果:
+```
++-----------------------------+---------+------------------------------------+---------------------------------+-------------------------------+
+|                         Time|__endTime|count(root.ln.wf02.wt01.temperature)|count(root.ln.wf02.wt01.hardware)|count(root.ln.wf02.wt01.status)|
++-----------------------------+---------+------------------------------------+---------------------------------+-------------------------------+
+|1970-01-01T08:00:01.000+08:00|   480000|                                  15|                               18|                             15|
+|1970-01-02T08:08:01.000+08:00| 86885000|                                   5|                                5|                              5|
++-----------------------------+---------+------------------------------------+---------------------------------+-------------------------------+
+```
+也可以和`HAVING`、`ALIGN BY DEVICE`共同使用
+```sql
+select __endTime,sum(hardware) from root.ln.wf02.wt01 group by session(50s) align by device
+```
+得到如下结果,其中排除了`sum(hardware)`为0的部分
+```
++-----------------------------+-----------------+---------+-------------+
+|                         Time|           Device|__endTime|sum(hardware)|
++-----------------------------+-----------------+---------+-------------+
+|1970-01-01T08:00:01.000+08:00|root.ln.wf02.wt01|   200000|       2475.0|
+|1970-01-01T08:04:20.000+08:00|root.ln.wf02.wt01|   260000|        440.0|
+|1970-01-01T08:05:20.000+08:00|root.ln.wf02.wt01|   320000|        550.0|
+|1970-01-02T08:08:01.000+08:00|root.ln.wf02.wt01| 86885000|       1650.0|
++-----------------------------+-----------------+---------+-------------+
 ```
\ No newline at end of file
diff --git a/docs/zh/UserGuide/Query-Data/Overview.md b/docs/zh/UserGuide/Query-Data/Overview.md
index c1b18adbcc..a38ead27e8 100644
--- a/docs/zh/UserGuide/Query-Data/Overview.md
+++ b/docs/zh/UserGuide/Query-Data/Overview.md
@@ -35,7 +35,8 @@ SELECT [LAST] selectExpr [, selectExpr] ...
         LEVEL = levelNum [, levelNum] ... |
         TAGS(tagKey [, tagKey] ... |
         VARIATION(expression[,delta][,ignoreNull=true/false])|
-        SERIES(expression,[keep>/>=/=/</<=]threshold[,ignoreNull=true/false])
+        SERIES(expression,[keep>/>=/=/</<=]threshold[,ignoreNull=true/false])|
+        SESSION(timeInterval)
     }]
     [HAVING havingCondition]
     [ORDER BY sortKey {ASC | DESC}]
@@ -75,7 +76,7 @@ SELECT [LAST] selectExpr [, selectExpr] ...
 ### `GROUP BY` 子句
 
 - `GROUP BY` 子句指定对序列进行分段或分组聚合的方式。
-- 分段聚合是指按照时间维度,针对同时间序列中不同数据点之间的时间关系,对数据在行的方向进行分段,每个段得到一个聚合值。目前支持**按时间区间分段**、**按事件分段**和**按事件条件分段**,未来将支持更多分段方式。
+- 分段聚合是指按照时间维度,针对同时间序列中不同数据点之间的时间关系,对数据在行的方向进行分段,每个段得到一个聚合值。目前支持**按时间区间分段**、**按事件分段**、**按事件条件分段**和**按会话时间分段**,未来将支持更多分段方式。
 - 分组聚合是指针对不同时间序列,在时间序列的潜在业务属性上分组,每个组包含若干条时间序列,每个组得到一个聚合值。支持**按路径层级分组**和**按序列标签分组**两种分组方式。
 - 分段聚合和分组聚合可以混合使用。
 - 详细说明及示例见文档 [分段分组聚合](./Group-By.md) 。
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupBySessionIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupBySessionIT.java
new file mode 100644
index 0000000000..6f405f6355
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupBySessionIT.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.groupby;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBGroupBySessionIT {
+
+  private static final String[] SQLs =
+      new String[] {
+        "CREATE DATABASE root.ln.wf02.wt01",
+        "CREATE TIMESERIES root.ln.wf02.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.ln.wf02.wt01.temperature WITH DATATYPE=DOUBLE, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.ln.wf02.wt01.hardware WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(1000, 35.7, false, 11)",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(2000, 35.8,  true, 22)",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(3000, 35.4, false, 33 )",
+        "flush",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(4000, 36.4, false, 44)",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(5000, 36.8, false, 55)",
+        "flush",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(10000, 36.8, false, 110)",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(20000, 37.8,  true, 220)",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(30000, 37.5, false, 330 )",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(40000, 37.4, false, 440)",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(50000, 37.9, false, 550)",
+        "flush",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(100000, 38.0, false, 110)",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(150000, 38.8,  true, 220)",
+        "flush",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(200000, 38.6, false, 330 )",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(260000, 38.4, false, 440)",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(320000, 38.3, false, 550)",
+        "flush",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(400000, null, null, 0)",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(470000, null, null, 0)",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(480000, null, null, 0)",
+        "flush",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(86881000, 38.2, false, 110)",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(86882000, 37.5,  true, 220)",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(86883000, 37.4, false, 330 )",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(86884000, 36.8, false, 440)",
+        "INSERT INTO root.ln.wf02.wt01(timestamp, temperature, status, hardware) values(86885000, 37.4, false, 550)",
+      };
+
+  private static final String[] SQLs2 =
+      new String[] {
+        "CREATE DATABASE root.ln.wf02.wt02",
+        "CREATE TIMESERIES root.ln.wf02.wt02.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.ln.wf02.wt02.temperature WITH DATATYPE=DOUBLE, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.ln.wf02.wt02.hardware WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(1, 35.7, false, 11)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(2, 35.8,  true, 22)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(3, 35.4, false, 33 )",
+        "flush",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(4, 36.4, false, 44)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(5, 36.8, false, 55)",
+        "flush",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(10, 36.8, false, 110)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(20, 37.8,  true, 220)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(30, 37.5, false, 330 )",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(40, 37.4, false, 440)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(50, 37.9, false, 550)",
+        "flush",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(100, 38.0, false, 110)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(150, 38.8,  true, 220)",
+        "flush",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(200, 38.6, false, 330 )",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(250, 38.4, false, 440)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(300, 38.3, false, 550)",
+        "flush",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(400, null, null, 0)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(440, null, null, 0)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(480, null, null, 0)",
+        "flush",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(500, 38.2, false, 110)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(510, 37.5,  true, 220)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(520, 37.4, false, 330 )",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(530, 36.8, false, 440)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(540, 37.4, false, 550)",
+        "flush",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(580, 37.8, false, 110)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(590, 37.9,  true, 220)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(600, 36.9, true, 330 )",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(610, 38.2, true, 440)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(620, 39.2, true, 550)",
+        "flush",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(1500, 9.8, false, 666)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(1550, 10.2, true, 888)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(3550, 10.8, true, 999)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(5550, 10.6, false, 1888)",
+        "INSERT INTO root.ln.wf02.wt02(timestamp, temperature, status, hardware) values(7550, 10.2, true, 2888)",
+        "flush"
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setEnableSeqSpaceCompaction(false)
+        .setEnableUnseqSpaceCompaction(false)
+        .setEnableCrossSpaceCompaction(false)
+        .setPartitionInterval(1000);
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareData(SQLs);
+    prepareData(SQLs2);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  private void checkHeader(ResultSetMetaData resultSetMetaData, String title) throws SQLException {
+    String[] headers = title.split(",");
+    for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+      assertEquals(headers[i - 1], resultSetMetaData.getColumnName(i));
+    }
+  }
+
+  private void normalTest(String[][] res, String sql) {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(
+            resultSetMetaData,
+            "Time,count(root.ln.wf02.wt02.status),avg(root.ln.wf02.wt02.temperature),sum(root.ln.wf02.wt02.hardware)");
+        int count = 0;
+        while (resultSet.next()) {
+          String actualTime = resultSet.getString(1);
+          String actualCount = resultSet.getString(2);
+          double actualAvg = resultSet.getDouble(3);
+          double actualSum = resultSet.getDouble(4);
+
+          assertEquals(res[count][0], actualTime);
+          assertEquals(res[count][1], actualCount);
+          assertEquals(Double.parseDouble(res[count][2]), actualAvg, 0.01);
+          assertEquals(Double.parseDouble(res[count][3]), actualSum, 0.01);
+          count++;
+        }
+        assertEquals(res.length, count);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void groupBySessionTest1() {
+    String[][] res =
+        new String[][] {
+          {"1", "15", "37.3067", "3465"},
+          {"400", "10", "37.73", "3300"},
+          {"1500", "2", "10", "1554"},
+          {"3550", "1", "10.8", "999"},
+          {"5550", "1", "10.6", "1888"},
+          {"7550", "1", "10.2", "2888"}
+        };
+
+    String sql =
+        "select count(status), avg(temperature), sum(hardware) from root.ln.wf02.wt02 group by session(99ms)";
+    normalTest(res, sql);
+  }
+
+  @Test
+  public void groupBySessionTest1WithHaving() {
+    String[][] res =
+        new String[][] {
+          {"1", "15", "37.3067", "3465"},
+          {"400", "10", "37.73", "3300"},
+        };
+
+    String sql =
+        "select count(status), avg(temperature), sum(hardware) from root.ln.wf02.wt02 group by session(99ms) having avg(temperature) > 30";
+    normalTest(res, sql);
+  }
+
+  @Test
+  public void groupBySessionTest2() {
+    String[][] res =
+        new String[][] {
+          {"1", "10"}, {"100", "1"}, {"150", "1"}, {"200", "1"}, {"250", "1"},
+          {"300", "1"}, {"500", "5"}, {"580", "5"}, {"1500", "1"}, {"1550", "1"},
+          {"3550", "1"}, {"5550", "1"}, {"7550", "1"}
+        };
+
+    String sql = "select count(temperature) from root.ln.wf02.wt02 group by session(10ms)";
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        int count = 0;
+        while (resultSet.next()) {
+          String actualTime = resultSet.getString(1);
+          String actualCount = resultSet.getString(2);
+
+          assertEquals(res[count][0], actualTime);
+          assertEquals(res[count][1], actualCount);
+          count++;
+        }
+        assertEquals(res.length, count);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void groupBySessionTest3() {
+    String[][] res =
+        new String[][] {
+          {"1", "10", "36.75", "1815"},
+          {"100", "1", "38", "110"},
+          {"150", "1", "38.8", "220"},
+          {"200", "1", "38.6", "330"},
+          {"250", "1", "38.4", "440"},
+          {"300", "1", "38.3", "550"},
+          {"400", "10", "37.73", "3300"},
+          {"1500", "1", "9.8", "666"},
+          {"1550", "1", "10.2", "888"},
+          {"3550", "1", "10.8", "999"},
+          {"5550", "1", "10.6", "1888"},
+          {"7550", "1", "10.2", "2888"}
+        };
+
+    String sql =
+        "select count(status), avg(temperature), sum(hardware) from root.ln.wf02.wt02 group by session(49ms)";
+    normalTest(res, sql);
+  }
+
+  public void groupBySessionFirstValueTest(String sql, String[][] res) {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        int count = 0;
+        while (resultSet.next()) {
+          String actualTime = resultSet.getString(1);
+          double firstValue = resultSet.getDouble(2);
+
+          assertEquals(res[count][0], actualTime);
+          assertEquals(Double.parseDouble(res[count][1]), firstValue, 0.01);
+          count++;
+        }
+        assertEquals(res.length, count);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void groupBySessionTest4() {
+    String[][] res =
+        new String[][] {
+          {"1", "35.7"}, {"100", "38"}, {"150", "38.8"}, {"200", "38.6"}, {"250", "38.4"},
+          {"300", "38.3"}, {"500", "38.2"}, {"580", "37.8"}, {"1500", "9.8"}, {"1550", "10.2"},
+          {"3550", "10.8"}, {"5550", "10.6"}, {"7550", "10.2"}
+        };
+
+    String sql = "select first_value(temperature) from root.ln.wf02.wt02 group by session(10ms)";
+    groupBySessionFirstValueTest(sql, res);
+  }
+
+  @Test
+  public void groupBySessionTest5() {
+    String[][] res =
+        new String[][] {
+          {"1", "27", "35.441", "8319"},
+          {"3550", "1", "10.8", "999"},
+          {"5550", "1", "10.6", "1888"},
+          {"7550", "1", "10.2", "2888"}
+        };
+
+    String sql =
+        "select count(status), avg(temperature), sum(hardware) from root.ln.wf02.wt02 group by session(1s)";
+    normalTest(res, sql);
+  }
+
+  @Test
+  public void groupBySessionTest6() {
+    String[][] res =
+        new String[][] {
+          {"7550", "1", "10.2", "2888"},
+          {"5550", "1", "10.6", "1888"},
+          {"3550", "1", "10.8", "999"},
+          {"1", "27", "35.441", "8319"}
+        };
+
+    String sql =
+        "select count(status), avg(temperature), sum(hardware) from root.ln.wf02.wt02 group by session(1s) order by time desc";
+    normalTest(res, sql);
+  }
+
+  @Test
+  public void groupBySessionTest7() {
+    String[][] res =
+        new String[][] {
+          {"7550", "1", "10.2", "2888"},
+          {"5550", "1", "10.6", "1888"},
+          {"3550", "1", "10.8", "999"},
+          {"1550", "1", "10.2", "888"},
+          {"1500", "1", "9.8", "666"},
+          {"400", "10", "37.73", "3300"},
+          {"300", "1", "38.3", "550"},
+          {"250", "1", "38.4", "440"},
+          {"200", "1", "38.6", "330"},
+          {"150", "1", "38.8", "220"},
+          {"100", "1", "38", "110"},
+          {"1", "10", "36.75", "1815"}
+        };
+
+    String sql =
+        "select count(status), avg(temperature), sum(hardware) from root.ln.wf02.wt02 group by session(49ms) order by time desc";
+    normalTest(res, sql);
+  }
+
+  @Test
+  public void groupBySessionTest8() {
+    String[][] res =
+        new String[][] {
+          {"7550", "10.2"}, {"5550", "10.6"}, {"3550", "10.8"}, {"1550", "10.2"}, {"1500", "9.8"},
+          {"580", "37.8"}, {"500", "38.2"}, {"300", "38.3"}, {"250", "38.4"}, {"200", "38.6"},
+          {"150", "38.8"}, {"100", "38"}, {"1", "35.7"}
+        };
+
+    String sql =
+        "select first_value(temperature) from root.ln.wf02.wt02 group by session(10ms) order by time desc";
+    groupBySessionFirstValueTest(sql, res);
+  }
+
+  @Test
+  public void GroupBySessionAlignByDeviceTest() {
+    String[][] res =
+        new String[][] {
+          {"1000", "200000", "13", "37.1461538462", "2475", "11"},
+          {"1", "7550", "30", "32.95", "14094", "11"}
+        };
+    String sql =
+        "select __endTime,count(status), avg(temperature), sum(hardware), first_value(hardware) from root.ln.** group by session(50s) having count(status)>5 align by device";
+    normalTestAlignByDevice(
+        res,
+        sql,
+        1,
+        "Time,Device,__endTime,count(status),avg(temperature),sum(hardware),first_value(hardware)");
+  }
+
+  @Test
+  public void GroupBySessionAlignByDeviceTest2() {
+    String[][] res =
+        new String[][] {
+          {"1000", "480000", "15", "37.3066666667", "3465", "11"},
+          {"86881000", "86885000", "5", "37.46", "1650", "110"},
+          {"1", "7550", "30", "32.95", "14094", "11"}
+        };
+    String sql =
+        "select __endTime,count(status), avg(temperature), sum(hardware), first_value(hardware) from root.ln.** group by session(1d) align by device";
+    normalTestAlignByDevice(
+        res,
+        sql,
+        2,
+        "Time,Device,__endTime,count(status),avg(temperature),sum(hardware),first_value(hardware)");
+  }
+
+  @Test
+  public void GroupBySessionAlignByDeviceTest3() {
+    String[][] res =
+        new String[][] {
+          {"1000", "200000", "11"},
+          {"260000", "260000", "440"},
+          {"320000", "320000", "550"},
+          {"400000", "400000", "0"},
+          {"470000", "480000", "0"},
+          {"86881000", "86885000", "110"},
+          {"1", "7550", "11"}
+        };
+    String sql =
+        "select __endTime,first_value(hardware) from root.ln.** group by session(50s) align by device";
+    normalTestAlignByDevice2(res, sql, 6, "Time,Device,__endTime,first_value(hardware)");
+  }
+
+  @Test
+  public void GroupBySessionAlignByDeviceTest4() {
+    String[][] res =
+        new String[][] {
+          {"1000", "480000", "18"},
+          {"86881000", "86885000", "5"},
+          {"1", "7550", "33"}
+        };
+    String sql =
+        "select __endTime,count(hardware) from root.ln.** group by session(1d) align by device";
+    normalTestAlignByDevice2(res, sql, 2, "Time,Device,__endTime,count(hardware)");
+  }
+
+  private void normalTestAlignByDevice(String[][] res, String sql, int split, String title) {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(resultSetMetaData, title);
+        int count = 0;
+        String device = "root.ln.wf02.wt01";
+        while (resultSet.next()) {
+          if (count == split) {
+            device = "root.ln.wf02.wt02";
+          }
+          String actualTime = resultSet.getString(1);
+          String actualDevice = resultSet.getString(2);
+          String actualEndTime = resultSet.getString(3);
+          String actualCount = resultSet.getString(4);
+          double actualAvg = resultSet.getDouble(5);
+          double actualSum = resultSet.getDouble(6);
+          String actualFirstValue = resultSet.getString(7);
+
+          assertEquals(device, actualDevice);
+          assertEquals(res[count][0], actualTime);
+          assertEquals(res[count][1], actualEndTime);
+          assertEquals(res[count][2], actualCount);
+          assertEquals(Double.parseDouble(res[count][3]), actualAvg, 0.01);
+          assertEquals(Double.parseDouble(res[count][4]), actualSum, 0.01);
+          assertEquals(res[count][5], actualFirstValue);
+          count++;
+        }
+        assertEquals(res.length, count);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private void normalTestAlignByDevice2(String[][] res, String sql, int split, String title) {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(resultSetMetaData, title);
+        int count = 0;
+        String device = "root.ln.wf02.wt01";
+        while (resultSet.next()) {
+          if (count == split) {
+            device = "root.ln.wf02.wt02";
+          }
+          String actualTime = resultSet.getString(1);
+          String actualDevice = resultSet.getString(2);
+          String actualEndTime = resultSet.getString(3);
+          String actualValue = resultSet.getString(4);
+
+          assertEquals(device, actualDevice);
+          assertEquals(res[count][0], actualTime);
+          assertEquals(res[count][1], actualEndTime);
+          assertEquals(res[count][2], actualValue);
+          count++;
+        }
+        assertEquals(res.length, count);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
diff --git a/server/src/main/codegen/templates/evEventWindowManager.ftl b/server/src/main/codegen/templates/evEventWindowManager.ftl
index c5a205db25..1064e94295 100644
--- a/server/src/main/codegen/templates/evEventWindowManager.ftl
+++ b/server/src/main/codegen/templates/evEventWindowManager.ftl
@@ -95,8 +95,12 @@ public class ${className} extends Event${dataType.dataType?cap_first}WindowManag
         }
       }
 
-      // judge whether we need update endTime
       long currentTime = timeColumn.getLong(i);
+      // judge whether we need update startTime
+      if (eventWindow.getStartTime() > currentTime) {
+        eventWindow.setStartTime(currentTime);
+      }
+      // judge whether we need update endTime
       if (eventWindow.getEndTime() < currentTime) {
         eventWindow.setEndTime(currentTime);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index 2624231631..222d04e288 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -67,7 +67,7 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
       long maxReturnSize,
       WindowParameter windowParameter) {
     super(operatorContext, aggregators, child, ascending, maxReturnSize);
-    this.windowManager = genWindowManager(windowParameter, timeRangeIterator);
+    this.windowManager = genWindowManager(windowParameter, timeRangeIterator, ascending);
     this.resultTsBlockBuilder = windowManager.createResultTsBlockBuilder(aggregators);
   }
 
@@ -157,11 +157,13 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
                 lastReadRowIndex,
                 aggregator.processTsBlock(inputTsBlock, windowManager.isIgnoringNull()));
       }
+
       // If lastReadRowIndex is not zero, some of tsBlock is consumed and result is cached in
       // aggregators.
       if (lastReadRowIndex != 0) {
         // todo update the keep value in group by series, it will be removed in the future
         windowManager.setKeep(lastReadRowIndex);
+        windowManager.setLastTsBlockTime();
         hasCachedDataInAggregator = true;
       }
       if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java
index 3daf6731d6..50c94da4fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java
@@ -151,6 +151,9 @@ public interface IWindowManager {
    */
   boolean isIgnoringNull();
 
-  // todo: used for keep value temporarily,it will be removed in the future.
+  // TODO: "group by series" used for keep value temporarily, it will be removed in the future.
   default void setKeep(long keep) {}
+
+  // TODO: "group by session" used for keeping lastTsBlockTime, it will be removed in the future.
+  default void setLastTsBlockTime() {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindow.java
index 66d49390aa..b32238e196 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindow.java
@@ -99,6 +99,10 @@ public class SeriesWindow implements IWindow {
     this.endTime = endTime;
   }
 
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
   public void setTimeInitialized(boolean timeInitialized) {
     this.timeInitialized = timeInitialized;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowManager.java
index 0564a991b5..d2e046b369 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowManager.java
@@ -115,6 +115,9 @@ public class SeriesWindowManager implements IWindowManager {
       if (isFirstSkip) {
         k++;
         long currentTime = timeColumn.getLong(i);
+        if (seriesWindow.getStartTime() > currentTime) {
+          seriesWindow.setStartTime(currentTime);
+        }
         if (seriesWindow.getEndTime() < currentTime) {
           seriesWindow.setEndTime(currentTime);
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindow.java
new file mode 100644
index 0000000000..68c4aa30af
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindow.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.Accumulator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class SessionWindow implements IWindow {
+
+  private final long timeInterval;
+
+  private final boolean ascending;
+
+  private long timeValue;
+
+  private long startTime;
+
+  private long endTime;
+
+  private long lastTsBlockTime;
+
+  private boolean initializedTimeValue;
+
+  public SessionWindow(long timeInterval, boolean ascending) {
+    this.timeInterval = timeInterval;
+    this.ascending = ascending;
+  }
+
+  @Override
+  public Column getControlColumn(TsBlock tsBlock) {
+    return tsBlock.getTimeColumn();
+  }
+
+  @Override
+  public boolean satisfy(Column column, int index) {
+    if (!initializedTimeValue) {
+      return true;
+    }
+    if (index == 0) {
+      return Math.abs(column.getLong(index) - lastTsBlockTime) <= timeInterval;
+    }
+    return Math.abs(column.getLong(index) - column.getLong(index - 1)) <= timeInterval;
+  }
+
+  @Override
+  public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
+    long currentTime = controlTimeAndValueColumn[0].getLong(index);
+    // judge whether we need initialize timeValue
+    if (!initializedTimeValue) {
+      startTime = currentTime;
+      endTime = currentTime;
+      lastTsBlockTime = controlTimeAndValueColumn[0].getLong(0);
+      timeValue = currentTime;
+      initializedTimeValue = true;
+      return;
+    }
+    // judge whether we need update startTime
+    if (startTime > currentTime) {
+      startTime = currentTime;
+    }
+    // judge whether we need update endTime
+    if (endTime < currentTime) {
+      endTime = currentTime;
+    }
+    // update the last time of session window
+    timeValue = ascending ? Math.max(timeValue, currentTime) : Math.min(timeValue, currentTime);
+  }
+
+  @Override
+  public boolean hasFinalResult(Accumulator accumulator) {
+    return accumulator.hasFinalResult();
+  }
+
+  @Override
+  public boolean contains(Column column) {
+    TimeColumn timeColumn = (TimeColumn) column;
+
+    long minTime = Math.min(timeColumn.getStartTime(), timeColumn.getEndTime());
+    long maxTime = Math.max(timeColumn.getStartTime(), timeColumn.getEndTime());
+
+    boolean contains =
+        Math.abs(column.getLong(0) - lastTsBlockTime) < timeInterval
+            && maxTime - minTime <= timeInterval;
+    if (contains) {
+      if (!initializedTimeValue) {
+        startTime = Long.MAX_VALUE;
+        endTime = Long.MIN_VALUE;
+        lastTsBlockTime = column.getLong(0);
+        timeValue = ascending ? maxTime : minTime;
+        initializedTimeValue = true;
+      }
+      timeValue = ascending ? Math.max(timeValue, maxTime) : Math.min(timeValue, minTime);
+      startTime = Math.min(startTime, minTime);
+      endTime = Math.max(endTime, maxTime);
+    }
+    return contains;
+  }
+
+  public long getTimeInterval() {
+    return timeInterval;
+  }
+
+  public long getTimeValue() {
+    return timeValue;
+  }
+
+  public void setTimeValue(long timeValue) {
+    this.timeValue = timeValue;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  public long getEndTime() {
+    return endTime;
+  }
+
+  public void setEndTime(long endTime) {
+    this.endTime = endTime;
+  }
+
+  public void setInitializedTimeValue(boolean initializedTimeValue) {
+    this.initializedTimeValue = initializedTimeValue;
+  }
+
+  public long getLastTsBlockTime() {
+    return lastTsBlockTime;
+  }
+
+  public void setLastTsBlockTime(long lastTsBlockTime) {
+    this.lastTsBlockTime = lastTsBlockTime;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindowManager.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowManager.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindowManager.java
index 0564a991b5..d8976445b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindowManager.java
@@ -19,39 +19,32 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
-import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory.KeepEvaluator;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import java.util.List;
 
-public class SeriesWindowManager implements IWindowManager {
+public class SessionWindowManager implements IWindowManager {
+
+  private final boolean isNeedOutputEndTime;
 
-  private final SeriesWindow seriesWindow;
   private boolean initialized;
+
   private boolean needSkip;
 
-  // skipPointsOutOfBound has two phrases in SeriesWindowManager.
-  // First phrase is to skip the row with the controlColumn of true in current window, which usually
-  // happens when LAST_VALUE or MAX_TIME leaves early in accumulator.
-  // Second phrase is to skip the row with the controlColumn of false/null which don't belong
-  // current window.
-  // isFirstSkip is used to identify the phrase.
-  private boolean isFirstSkip;
-  private final KeepEvaluator keepEvaluator;
-
-  public SeriesWindowManager(SeriesWindowParameter seriesWindowParameter) {
-    this.seriesWindow = new SeriesWindow(seriesWindowParameter);
+  private final SessionWindow sessionWindow;
+
+  public SessionWindowManager(boolean isNeedOutputEndTime, long timeInterval, boolean ascending) {
+    this.isNeedOutputEndTime = isNeedOutputEndTime;
+    this.initialized = false;
+    // At beginning, we do not need to skip inputTsBlock
     this.needSkip = false;
-    this.keepEvaluator =
-        AccumulatorFactory.initKeepEvaluator(seriesWindowParameter.getKeepExpression());
+    this.sessionWindow = new SessionWindow(timeInterval, ascending);
   }
 
   @Override
@@ -62,8 +55,7 @@ public class SeriesWindowManager implements IWindowManager {
   @Override
   public void initCurWindow() {
     this.initialized = true;
-    this.seriesWindow.setTimeInitialized(false);
-    this.seriesWindow.setKeep(0);
+    this.sessionWindow.setInitializedTimeValue(false);
   }
 
   @Override
@@ -73,14 +65,16 @@ public class SeriesWindowManager implements IWindowManager {
 
   @Override
   public void next() {
+    // When we go into next window, we should pay attention to previous window whether all points
+    // belong to previous window have been consumed. If not, we need skip these points.
     this.needSkip = true;
     this.initialized = false;
-    isFirstSkip = true;
+    this.sessionWindow.setLastTsBlockTime(0);
   }
 
   @Override
   public IWindow getCurWindow() {
-    return seriesWindow;
+    return sessionWindow;
   }
 
   @Override
@@ -93,54 +87,49 @@ public class SeriesWindowManager implements IWindowManager {
       return inputTsBlock;
     }
 
-    Column controlColumn = seriesWindow.getControlColumn(inputTsBlock);
     TimeColumn timeColumn = inputTsBlock.getTimeColumn();
     int i = 0, size = inputTsBlock.getPositionCount();
-    int k = 0;
-    for (; i < size; i++) {
-
-      // if ignoreNull is true, ignore the controlColumn of null
-      if (isIgnoringNull() && controlColumn.isNull(i)) continue;
+    long previousTimeValue = sessionWindow.getTimeValue();
 
-      // the first phrase of skip
-      if (isFirstSkip && (controlColumn.isNull(i) || !controlColumn.getBoolean(i))) {
-        break;
-        // the second phrase of skip
-      } else if (!isFirstSkip && !controlColumn.isNull(i) && controlColumn.getBoolean(i)) {
+    for (; i < size; i++) {
+      long currentTime = timeColumn.getLong(i);
+      if (Math.abs(currentTime - previousTimeValue) > sessionWindow.getTimeInterval()) {
+        sessionWindow.setTimeValue(previousTimeValue);
         break;
       }
-
-      // update endTime and record the row processed, only the first phrase of skip in current
-      // window need to record them.
-      if (isFirstSkip) {
-        k++;
-        long currentTime = timeColumn.getLong(i);
-        if (seriesWindow.getEndTime() < currentTime) {
-          seriesWindow.setEndTime(currentTime);
-        }
+      // judge whether we need update endTime
+      if (sessionWindow.getStartTime() > currentTime) {
+        sessionWindow.setStartTime(currentTime);
       }
+      // judge whether we need update endTime
+      if (sessionWindow.getEndTime() < currentTime) {
+        sessionWindow.setEndTime(currentTime);
+      }
+      previousTimeValue = currentTime;
     }
 
-    // record the row processed in the first phrase of skip. If the tsBlock is null, the skip may
-    // not finish.
-    if (isFirstSkip) {
-      if (i != size) isFirstSkip = false;
-      seriesWindow.setKeep(seriesWindow.getKeep() + k);
-      return inputTsBlock.subTsBlock(i);
-    }
-
+    // we can create a new window beginning at index i of inputTsBlock
     if (i < size) {
-      // we can create a new window beginning at index i of inputTsBlock
       needSkip = false;
     }
     return inputTsBlock.subTsBlock(i);
   }
 
+  @Override
+  public boolean satisfiedCurWindow(TsBlock inputTsBlock) {
+    return true;
+  }
+
+  @Override
+  public boolean isTsBlockOutOfBound(TsBlock inputTsBlock) {
+    return false;
+  }
+
   @Override
   public TsBlockBuilder createResultTsBlockBuilder(List<Aggregator> aggregators) {
     List<TSDataType> dataTypes = getResultDataTypes(aggregators);
     // Judge whether we need output endTime column.
-    if (seriesWindow.isOutputEndTime()) {
+    if (isNeedOutputEndTime) {
       dataTypes.add(0, TSDataType.INT64);
     }
     return new TsBlockBuilder(dataTypes);
@@ -149,18 +138,14 @@ public class SeriesWindowManager implements IWindowManager {
   @Override
   public void appendAggregationResult(
       TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
-    if (!keepEvaluator.apply(seriesWindow.getKeep())) {
-      for (Aggregator aggregator : aggregators) aggregator.reset();
-      return;
-    }
-    // Use the start time of eventWindow as default output time.
+    // Use the start time of sessionWindow as default output time.
     TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
-    timeColumnBuilder.writeLong(seriesWindow.getStartTime());
+    timeColumnBuilder.writeLong(sessionWindow.getStartTime());
 
     ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
     int columnIndex = 0;
-    if (seriesWindow.isOutputEndTime()) {
-      columnBuilders[0].writeLong(seriesWindow.getEndTime());
+    if (isNeedOutputEndTime) {
+      columnBuilders[0].writeLong(sessionWindow.getEndTime());
       columnIndex = 1;
     }
     for (Aggregator aggregator : aggregators) {
@@ -174,18 +159,23 @@ public class SeriesWindowManager implements IWindowManager {
     resultTsBlockBuilder.declarePosition();
   }
 
+  @Override
+  public boolean notInitedLastTimeWindow() {
+    return false;
+  }
+
   @Override
   public boolean needSkipInAdvance() {
-    return true;
+    return isNeedOutputEndTime;
   }
 
   @Override
   public boolean isIgnoringNull() {
-    return seriesWindow.ignoringNull();
+    return false;
   }
 
   @Override
-  public void setKeep(long keep) {
-    seriesWindow.setKeep(seriesWindow.getKeep() + keep);
+  public void setLastTsBlockTime() {
+    this.sessionWindow.setLastTsBlockTime(this.sessionWindow.getTimeValue());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindowParameter.java
similarity index 71%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindowParameter.java
index 55ee21c58c..26959c4df9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindowParameter.java
@@ -19,18 +19,17 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.window;
 
-public enum WindowType {
-  TIME_WINDOW((byte) 0),
-  EVENT_WINDOW((byte) 1),
-  SERIES_WINDOW((byte) 2);
+public class SessionWindowParameter extends WindowParameter {
 
-  private final byte type;
+  private final long timeInterval;
 
-  WindowType(byte type) {
-    this.type = type;
+  public SessionWindowParameter(long timeInterval, boolean needOutputEndTime) {
+    super(needOutputEndTime);
+    this.timeInterval = timeInterval;
+    this.windowType = WindowType.SESSION_WINDOW;
   }
 
-  public byte getType() {
-    return type;
+  public long getTimeInterval() {
+    return timeInterval;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java
index 54d88afeb5..eaeba4d246 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java
@@ -25,18 +25,21 @@ import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 public class WindowManagerFactory {
 
   public static IWindowManager genWindowManager(
-      WindowParameter windowParameter, ITimeRangeIterator timeRangeIterator) {
+      WindowParameter windowParameter, ITimeRangeIterator timeRangeIterator, boolean ascending) {
     switch (windowParameter.getWindowType()) {
       case TIME_WINDOW:
         return new TimeWindowManager(timeRangeIterator, (TimeWindowParameter) windowParameter);
       case EVENT_WINDOW:
         return ((EventWindowParameter) windowParameter).getDelta() == 0
-            ? genEqualEventWindowManager(
-                (EventWindowParameter) windowParameter, timeRangeIterator.isAscending())
-            : genVariationEventWindowManager(
-                (EventWindowParameter) windowParameter, timeRangeIterator.isAscending());
+            ? genEqualEventWindowManager((EventWindowParameter) windowParameter, ascending)
+            : genVariationEventWindowManager((EventWindowParameter) windowParameter, ascending);
       case SERIES_WINDOW:
         return new SeriesWindowManager((SeriesWindowParameter) windowParameter);
+      case SESSION_WINDOW:
+        return new SessionWindowManager(
+            windowParameter.isNeedOutputEndTime(),
+            ((SessionWindowParameter) windowParameter).getTimeInterval(),
+            ascending);
       default:
         throw new IllegalArgumentException(
             "Not support this type of aggregation window :"
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
index 55ee21c58c..e2ff0f8241 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
@@ -22,7 +22,8 @@ package org.apache.iotdb.db.mpp.execution.operator.window;
 public enum WindowType {
   TIME_WINDOW((byte) 0),
   EVENT_WINDOW((byte) 1),
-  SERIES_WINDOW((byte) 2);
+  SERIES_WINDOW((byte) 2),
+  SESSION_WINDOW((byte) 3);
 
   private final byte type;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index f8e7874dd3..ce1488490b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -69,6 +69,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDes
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupBySeriesParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupBySessionParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByVariationParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
@@ -79,6 +80,7 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupBySeriesComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.GroupBySessionComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByVariationComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent;
@@ -888,7 +890,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
           // we need to reconsider the process of it
           sourceTransformExpressions.add(expression.getExpressions().get(0));
         }
-        if (analysis.hasGroupByParameter()) {
+        if (queryStatement.hasGroupByExpression()) {
           sourceTransformExpressions.add(analysis.getDeviceToGroupByExpression().get(deviceName));
         }
         deviceToSourceTransformExpressions.put(deviceName, sourceTransformExpressions);
@@ -1205,6 +1207,11 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
           new GroupBySeriesParameter(groupByComponent.isIgnoringNull(), keepExpression);
       analysis.setGroupByParameter(groupByParameter);
       analysis.setDeviceToGroupByExpression(deviceToGroupByExpression);
+    } else if (windowType == WindowType.SESSION_WINDOW) {
+      GroupByParameter groupByParameter =
+          new GroupBySessionParameter(
+              ((GroupBySessionComponent) groupByComponent).getTimeInterval());
+      analysis.setGroupByParameter(groupByParameter);
     } else {
       throw new SemanticException("Unsupported window type");
     }
@@ -1251,6 +1258,10 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
           new GroupBySeriesParameter(groupByComponent.isIgnoringNull(), keepExpression);
       analysis.setGroupByExpression(groupByExpression);
       analysis.setGroupByParameter(groupByParameter);
+    } else if (windowType == WindowType.SESSION_WINDOW) {
+      long interval = ((GroupBySessionComponent) groupByComponent).getTimeInterval();
+      GroupByParameter groupByParameter = new GroupBySessionParameter(interval);
+      analysis.setGroupByParameter(groupByParameter);
     } else {
       throw new SemanticException("Unsupported window type");
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 1d5456683d..dced126e8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.FromComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByLevelComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupBySeriesComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.GroupBySessionComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTagComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByVariationComponent;
@@ -216,6 +217,9 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
       "For delete statement, where clause can only contain time expressions, "
           + "value filter is not currently supported.";
 
+  private static final String GROUP_BY_COMMON_ONLY_ONE_MSG =
+      "Only one of group by time or group by variation/series/session can be supported at a time";
+
   private static final String IGNORENULL = "IgnoreNull";
 
   private ZoneId zoneId;
@@ -935,8 +939,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
       for (IoTDBSqlParser.GroupByAttributeClauseContext groupByAttribute : groupByAttributes) {
         if (groupByAttribute.TIME() != null || groupByAttribute.interval != null) {
           if (groupByKeys.contains("COMMON")) {
-            throw new SemanticException(
-                "Only one of group by time or group by variation/series can be supported at a time");
+            throw new SemanticException(GROUP_BY_COMMON_ONLY_ONE_MSG);
           }
 
           groupByKeys.add("COMMON");
@@ -957,8 +960,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
           queryStatement.setGroupByTagComponent(parseGroupByTagClause(groupByAttribute));
         } else if (groupByAttribute.VARIATION() != null) {
           if (groupByKeys.contains("COMMON")) {
-            throw new SemanticException(
-                "Only one of group by time or group by variation/series can be supported at a time");
+            throw new SemanticException(GROUP_BY_COMMON_ONLY_ONE_MSG);
           }
 
           groupByKeys.add("COMMON");
@@ -966,13 +968,20 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
               parseGroupByClause(groupByAttribute, WindowType.EVENT_WINDOW));
         } else if (groupByAttribute.SERIES() != null) {
           if (groupByKeys.contains("COMMON")) {
-            throw new SemanticException(
-                "Only one of group by time or group by variation/series can be supported at a time");
+            throw new SemanticException(GROUP_BY_COMMON_ONLY_ONE_MSG);
           }
 
           groupByKeys.add("COMMON");
           queryStatement.setGroupByComponent(
               parseGroupByClause(groupByAttribute, WindowType.SERIES_WINDOW));
+        } else if (groupByAttribute.SESSION() != null) {
+          if (groupByKeys.contains("COMMON")) {
+            throw new SemanticException(GROUP_BY_COMMON_ONLY_ONE_MSG);
+          }
+
+          groupByKeys.add("COMMON");
+          queryStatement.setGroupByComponent(
+              parseGroupByClause(groupByAttribute, WindowType.SESSION_WINDOW));
         } else {
           throw new SemanticException("Unknown GROUP BY type.");
         }
@@ -1222,6 +1231,9 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
       }
       groupBySeriesComponent.setIgnoringNull(ignoringNull);
       return groupBySeriesComponent;
+    } else if (windowType == WindowType.SESSION_WINDOW) {
+      long interval = DateTimeUtils.convertDurationStrToLong(ctx.timeInterval.getText());
+      return new GroupBySessionComponent(interval);
     } else {
       throw new SemanticException("Unsupported window type");
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 4e1338420b..dd4803d6bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -120,6 +120,7 @@ import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ShowQueriesOperator;
 import org.apache.iotdb.db.mpp.execution.operator.window.EventWindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.SeriesWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.SessionWindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.WindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.WindowType;
@@ -184,6 +185,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDes
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupBySeriesParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupBySessionParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByVariationParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -1451,7 +1453,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
       // groupByParameter and groupByTimeParameter
       if (groupByParameter != null) {
         WindowType windowType = groupByParameter.getWindowType();
-        WindowParameter windowParameter = null;
+
+        WindowParameter windowParameter;
         switch (windowType) {
           case EVENT_WINDOW:
             String controlColumn = node.getGroupByExpression().getExpressionString();
@@ -1461,20 +1464,26 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                     controlColumnType,
                     layout.get(controlColumn).get(0).getValueColumnIndex(),
                     node.isOutputEndTime(),
-                    groupByParameter.isIgnoringNull(),
+                    ((GroupByVariationParameter) groupByParameter).isIgnoringNull(),
                     ((GroupByVariationParameter) groupByParameter).getDelta());
             break;
           case SERIES_WINDOW:
             windowParameter =
                 new SeriesWindowParameter(
                     node.isOutputEndTime(),
-                    groupByParameter.isIgnoringNull(),
+                    ((GroupBySeriesParameter) groupByParameter).isIgnoringNull(),
                     layout
                         .get(node.getGroupByExpression().getExpressionString())
                         .get(0)
                         .getValueColumnIndex(),
                     ((GroupBySeriesParameter) groupByParameter).getKeepExpression());
             break;
+          case SESSION_WINDOW:
+            windowParameter =
+                new SessionWindowParameter(
+                    ((GroupBySessionParameter) groupByParameter).getTimeInterval(),
+                    node.isOutputEndTime());
+            break;
           default:
             throw new IllegalArgumentException("Unsupported window type");
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByParameter.java
index 5f714579c9..3380ea6a0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByParameter.java
@@ -33,34 +33,25 @@ public abstract class GroupByParameter {
 
   protected WindowType windowType;
 
-  protected boolean ignoringNull;
-
-  public GroupByParameter(WindowType windowType, boolean ignoringNull) {
+  public GroupByParameter(WindowType windowType) {
     this.windowType = windowType;
-    this.ignoringNull = ignoringNull;
   }
 
   public WindowType getWindowType() {
     return windowType;
   }
 
-  public boolean isIgnoringNull() {
-    return ignoringNull;
-  }
-
   protected abstract void serializeAttributes(ByteBuffer byteBuffer);
 
   protected abstract void serializeAttributes(DataOutputStream stream) throws IOException;
 
   public void serialize(ByteBuffer buffer) {
     ReadWriteIOUtils.write(windowType.getType(), buffer);
-    ReadWriteIOUtils.write(ignoringNull, buffer);
     serializeAttributes(buffer);
   }
 
   public void serialize(DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(windowType.getType(), stream);
-    ReadWriteIOUtils.write(ignoringNull, stream);
     serializeAttributes(stream);
   }
 
@@ -70,12 +61,12 @@ public abstract class GroupByParameter {
       return false;
     }
     GroupByParameter other = (GroupByParameter) obj;
-    return this.windowType == other.windowType && this.ignoringNull == other.ignoringNull;
+    return this.windowType == other.windowType;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(windowType, ignoringNull);
+    return Objects.hash(windowType);
   }
 
   public static GroupByParameter deserialize(ByteBuffer byteBuffer) {
@@ -84,6 +75,8 @@ public abstract class GroupByParameter {
       return GroupByVariationParameter.deserialize(byteBuffer);
     } else if (type == WindowType.SERIES_WINDOW.getType()) {
       return GroupBySeriesParameter.deserialize(byteBuffer);
+    } else if (type == WindowType.SESSION_WINDOW.getType()) {
+      return GroupBySessionParameter.deserialize(byteBuffer);
     } else throw new SemanticException("Unsupported window type");
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupBySeriesParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupBySeriesParameter.java
index 8ce7eb1d72..f0ed735779 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupBySeriesParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupBySeriesParameter.java
@@ -31,19 +31,23 @@ import java.util.Objects;
 public class GroupBySeriesParameter extends GroupByParameter {
 
   private final Expression keepExpression;
+  private boolean ignoringNull;
 
   public GroupBySeriesParameter(boolean ignoringNull, Expression keepExpression) {
-    super(WindowType.SERIES_WINDOW, ignoringNull);
+    super(WindowType.SERIES_WINDOW);
     this.keepExpression = keepExpression;
+    this.ignoringNull = ignoringNull;
   }
 
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(ignoringNull, byteBuffer);
     Expression.serialize(keepExpression, byteBuffer);
   }
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(ignoringNull, stream);
     Expression.serialize(keepExpression, stream);
   }
 
@@ -57,6 +61,10 @@ public class GroupBySeriesParameter extends GroupByParameter {
     return keepExpression;
   }
 
+  public boolean isIgnoringNull() {
+    return ignoringNull;
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
@@ -68,11 +76,12 @@ public class GroupBySeriesParameter extends GroupByParameter {
     if (!super.equals(obj)) {
       return false;
     }
-    return this.keepExpression == ((GroupBySeriesParameter) obj).getKeepExpression();
+    return this.keepExpression == ((GroupBySeriesParameter) obj).getKeepExpression()
+        && this.ignoringNull == ((GroupBySeriesParameter) obj).ignoringNull;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), keepExpression);
+    return Objects.hash(super.hashCode(), keepExpression, ignoringNull);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByVariationParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupBySessionParameter.java
similarity index 67%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByVariationParameter.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupBySessionParameter.java
index d391a3a4cd..5f554bab22 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByVariationParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupBySessionParameter.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -27,37 +27,32 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-public class GroupByVariationParameter extends GroupByParameter {
+public class GroupBySessionParameter extends GroupByParameter {
 
-  double delta;
+  private final long timeInterval;
 
-  public GroupByVariationParameter(boolean ignoringNull, double delta) {
-    super(WindowType.EVENT_WINDOW, ignoringNull);
-    this.delta = delta;
+  public GroupBySessionParameter(long timeInterval) {
+    super(WindowType.SESSION_WINDOW);
+    this.timeInterval = timeInterval;
   }
 
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
-    ReadWriteIOUtils.write(delta, byteBuffer);
+    ReadWriteIOUtils.write(timeInterval, byteBuffer);
   }
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws IOException {
-    ReadWriteIOUtils.write(delta, stream);
+    ReadWriteIOUtils.write(timeInterval, stream);
   }
 
-  public void setDelta(double delta) {
-    this.delta = delta;
-  }
-
-  public double getDelta() {
-    return delta;
+  public static GroupByParameter deserialize(ByteBuffer buffer) {
+    long timeInterval = ReadWriteIOUtils.readLong(buffer);
+    return new GroupBySessionParameter(timeInterval);
   }
 
-  public static GroupByParameter deserialize(ByteBuffer buffer) {
-    boolean ignoringNull = ReadWriteIOUtils.readBool(buffer);
-    double delta = ReadWriteIOUtils.readDouble(buffer);
-    return new GroupByVariationParameter(ignoringNull, delta);
+  public long getTimeInterval() {
+    return timeInterval;
   }
 
   @Override
@@ -71,11 +66,11 @@ public class GroupByVariationParameter extends GroupByParameter {
     if (!super.equals(obj)) {
       return false;
     }
-    return this.delta == ((GroupByVariationParameter) obj).getDelta();
+    return this.timeInterval == ((GroupBySessionParameter) obj).timeInterval;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), delta);
+    return Objects.hash(super.hashCode(), timeInterval);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByVariationParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByVariationParameter.java
index d391a3a4cd..6a397e5e33 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByVariationParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByVariationParameter.java
@@ -31,29 +31,34 @@ public class GroupByVariationParameter extends GroupByParameter {
 
   double delta;
 
+  boolean ignoringNull;
+
   public GroupByVariationParameter(boolean ignoringNull, double delta) {
-    super(WindowType.EVENT_WINDOW, ignoringNull);
+    super(WindowType.EVENT_WINDOW);
     this.delta = delta;
+    this.ignoringNull = ignoringNull;
   }
 
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(ignoringNull, byteBuffer);
     ReadWriteIOUtils.write(delta, byteBuffer);
   }
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(ignoringNull, stream);
     ReadWriteIOUtils.write(delta, stream);
   }
 
-  public void setDelta(double delta) {
-    this.delta = delta;
-  }
-
   public double getDelta() {
     return delta;
   }
 
+  public boolean isIgnoringNull() {
+    return ignoringNull;
+  }
+
   public static GroupByParameter deserialize(ByteBuffer buffer) {
     boolean ignoringNull = ReadWriteIOUtils.readBool(buffer);
     double delta = ReadWriteIOUtils.readDouble(buffer);
@@ -71,11 +76,12 @@ public class GroupByVariationParameter extends GroupByParameter {
     if (!super.equals(obj)) {
       return false;
     }
-    return this.delta == ((GroupByVariationParameter) obj).getDelta();
+    return this.delta == ((GroupByVariationParameter) obj).getDelta()
+        && this.ignoringNull == ((GroupByVariationParameter) obj).isIgnoringNull();
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), delta);
+    return Objects.hash(super.hashCode(), delta, ignoringNull);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupBySessionComponent.java
similarity index 66%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupBySessionComponent.java
index 55ee21c58c..90692874bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupBySessionComponent.java
@@ -17,20 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.execution.operator.window;
+package org.apache.iotdb.db.mpp.plan.statement.component;
 
-public enum WindowType {
-  TIME_WINDOW((byte) 0),
-  EVENT_WINDOW((byte) 1),
-  SERIES_WINDOW((byte) 2);
+import org.apache.iotdb.db.mpp.execution.operator.window.WindowType;
 
-  private final byte type;
+public class GroupBySessionComponent extends GroupByComponent {
 
-  WindowType(byte type) {
-    this.type = type;
+  private final long timeInterval;
+
+  public GroupBySessionComponent(long timeInterval) {
+    super(WindowType.SESSION_WINDOW);
+    this.timeInterval = timeInterval;
   }
 
-  public byte getType() {
-    return type;
+  public long getTimeInterval() {
+    return timeInterval;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
index 2fb401d6a9..ef9550a7ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
@@ -378,6 +378,9 @@ public class QueryStatement extends Statement {
       if (disableAlign()) {
         throw new SemanticException("AGGREGATION doesn't support disable align clause.");
       }
+      if (groupByComponent != null && isGroupByLevel()) {
+        throw new SemanticException("GROUP BY CLAUSES doesn't support GROUP BY LEVEL now.");
+      }
       if (isGroupByLevel() && isAlignByDevice()) {
         throw new SemanticException("GROUP BY LEVEL does not support align by device now.");
       }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index b50d158147..083366b9c3 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComp
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.window.EventWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.SessionWindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.WindowParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -805,6 +806,64 @@ public class RawDataAggregationOperatorTest {
     assertEquals(resultMinTime2, 499);
   }
 
+  @Test
+  public void groupBySessionRawDataTest1() throws IllegalPathException {
+    int[][] result = new int[][] {{0}, {499}, {20000}, {10499}};
+    List<TAggregationType> aggregationTypes = new ArrayList<>();
+    List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(TAggregationType.MIN_TIME);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(TAggregationType.MAX_TIME);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(TAggregationType.FIRST_VALUE);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(TAggregationType.LAST_VALUE);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+
+    WindowParameter windowParameter = new SessionWindowParameter(2, false);
+
+    RawDataAggregationOperator rawDataAggregationOperator =
+        initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
+    int count = 0;
+    while (rawDataAggregationOperator.hasNext()) {
+      TsBlock resultTsBlock = rawDataAggregationOperator.next();
+      if (resultTsBlock == null) {
+        continue;
+      }
+      for (int row = 0; row < resultTsBlock.getPositionCount(); row++, count++) {
+        for (int i = 0; i < 2; i++) {
+          assertEquals(result[0][count], resultTsBlock.getColumn(i).getLong(row));
+        }
+        for (int i = 2; i < 4; i++) {
+          assertEquals(result[1][count], resultTsBlock.getColumn(i).getLong(row));
+        }
+        for (int i = 4; i < 6; i++) {
+          assertEquals(result[2][count], resultTsBlock.getColumn(i).getInt(row));
+        }
+        for (int i = 6; i < 8; i++) {
+          assertEquals(result[3][count], resultTsBlock.getColumn(i).getInt(row));
+        }
+      }
+    }
+    assertEquals(1, count);
+  }
+
   private RawDataAggregationOperator initRawDataAggregationOperator(
       List<TAggregationType> aggregationTypes,
       GroupByTimeParameter groupByTimeParameter,