You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/03/16 01:37:17 UTC

[iotdb] branch master updated: [IOTDB-4439] Support GROUP BY COUNT in aggregation query

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 687da0ec0d [IOTDB-4439] Support GROUP BY COUNT in aggregation query
687da0ec0d is described below

commit 687da0ec0d4b75f91861f45894f505a9a76cdd68
Author: YangCaiyin <yc...@gmail.com>
AuthorDate: Thu Mar 16 09:37:09 2023 +0800

    [IOTDB-4439] Support GROUP BY COUNT in aggregation query
---
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |   1 +
 docs/UserGuide/Query-Data/Group-By.md              |  67 ++++
 docs/UserGuide/Query-Data/Overview.md              |  11 +-
 docs/zh/UserGuide/Query-Data/Group-By.md           |  66 ++++
 docs/zh/UserGuide/Query-Data/Overview.md           |   9 +-
 .../iotdb/db/it/groupby/IoTDBGroupByCountIT.java   | 407 +++++++++++++++++++++
 .../operator/window/AbstractVariationWindow.java   |   6 -
 .../execution/operator/window/ConditionWindow.java |   6 -
 ...stractVariationWindow.java => CountWindow.java} |  80 ++--
 .../operator/window/CountWindowManager.java        | 141 +++++++
 .../{WindowType.java => CountWindowParameter.java} |  33 +-
 .../db/mpp/execution/operator/window/IWindow.java  |  19 +-
 .../execution/operator/window/SessionWindow.java   |   6 -
 .../mpp/execution/operator/window/TimeWindow.java  |   8 -
 .../operator/window/WindowManagerFactory.java      |   2 +
 .../mpp/execution/operator/window/WindowType.java  |   3 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  24 +-
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  18 +
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  17 +
 .../plan/parameter/GroupByCountParameter.java      |  85 +++++
 .../component/GroupByCountComponent.java}          |  23 +-
 .../db/mpp/plan/statement/crud/QueryStatement.java |   6 +-
 22 files changed, 921 insertions(+), 117 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 064d1151fd..b5173fe252 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -498,6 +498,7 @@ groupByAttributeClause
     | VARIATION LR_BRACKET expression (COMMA delta=number)? (COMMA attributePair)? RR_BRACKET
     | CONDITION LR_BRACKET expression (COMMA expression)? (COMMA attributePair)? RR_BRACKET
     | SESSION LR_BRACKET timeInterval=DURATION_LITERAL RR_BRACKET
+    | COUNT LR_BRACKET expression COMMA countNumber=INTEGER_LITERAL (COMMA attributePair)? RR_BRACKET
     ;
 
 number
diff --git a/docs/UserGuide/Query-Data/Group-By.md b/docs/UserGuide/Query-Data/Group-By.md
index ecb07e65a6..c6f5196a02 100644
--- a/docs/UserGuide/Query-Data/Group-By.md
+++ b/docs/UserGuide/Query-Data/Group-By.md
@@ -854,4 +854,71 @@ Get the result below:
 |1970-01-01T08:05:20.000+08:00|root.ln.wf02.wt01|1970-01-01T08:05:20.000+08:00|        550.0|
 |1970-01-02T08:08:01.000+08:00|root.ln.wf02.wt01|1970-01-02T08:08:05.000+08:00|       1650.0|
 +-----------------------------+-----------------+-----------------------------+-------------+
+```
+## Aggregation By Count
+`GROUP BY COUNT`can aggregate the data points according to the number of points. It can group fixed number of continuous data points together for aggregation query.
+Its syntax is defined as follows:
+```sql
+group by count(controlExpression, size[,ignoreNull=true/false])
+```
+
+* controlExpression
+
+The object to count during processing, it can be any column or an expression of columns.
+
+* size
+
+The number of data points in a group, a number of `size` continuous points will be divided to the same group. 
+
+* ignoreNull=true/false
+
+Whether to ignore the data points with null in `controlExpression`, when ignoreNull is true, data points with the `controlExpression` of null will be skipped during counting.
+
+### Precautions for Use
+1. For a group in resultSet, the time column output the start time of the group by default. __endTime can be used in select clause to output the endTime of groups in resultSet.
+2. Each device is grouped separately when used with `ALIGN BY DEVICE`.
+3. Currently `GROUP BY SESSION` is not supported with `GROUP BY LEVEL`.
+4. When the final number of data points in a group is less than `size`, the result of the group will not be output.
+
+For the data below, some examples will be given.
+```
++-----------------------------+-----------+-----------------------+
+|                         Time|root.sg.soc|root.sg.charging_status|
++-----------------------------+-----------+-----------------------+
+|1970-01-01T08:00:00.001+08:00|       14.0|                      1|                                   
+|1970-01-01T08:00:00.002+08:00|       16.0|                      1|                                 
+|1970-01-01T08:00:00.003+08:00|       16.0|                      0|                                   
+|1970-01-01T08:00:00.004+08:00|       16.0|                      0|                                   
+|1970-01-01T08:00:00.005+08:00|       18.0|                      1|                                   
+|1970-01-01T08:00:00.006+08:00|       24.0|                      1|                                   
+|1970-01-01T08:00:00.007+08:00|       36.0|                      1|                                   
+|1970-01-01T08:00:00.008+08:00|       36.0|                   null|                                   
+|1970-01-01T08:00:00.009+08:00|       45.0|                      1|                                   
+|1970-01-01T08:00:00.010+08:00|       60.0|                      1|
++-----------------------------+-----------+-----------------------+
+```
+The sql is shown below
+```sql
+select count(charging_stauts), first_value(soc) from root.sg group by count(charging_status,5) 
+```
+Get the result below, in the second group from 1970-01-01T08:00:00.006+08:00 to 1970-01-01T08:00:00.010+08:00. There are only four points included which is less than `size`. So it won't be output.
+```
++-----------------------------+-----------------------------+--------------------------------------+
+|                         Time|                    __endTime|first_value(root.sg.beijing.car01.soc)|
++-----------------------------+-----------------------------+--------------------------------------+
+|1970-01-01T08:00:00.001+08:00|1970-01-01T08:00:00.005+08:00|                                  14.0|
++-----------------------------+-----------------------------+--------------------------------------+
+```
+When `ignoreNull=false` is used to take null value into account. There will be two groups with 5 points in the resultSet, which is shown as follows:
+```sql
+select count(charging_stauts), first_value(soc) from root.sg group by count(charging_status,5,ignoreNull=false) 
+```
+Get the results:
+```
++-----------------------------+-----------------------------+--------------------------------------+
+|                         Time|                    __endTime|first_value(root.sg.beijing.car01.soc)|
++-----------------------------+-----------------------------+--------------------------------------+
+|1970-01-01T08:00:00.001+08:00|1970-01-01T08:00:00.005+08:00|                                  14.0|
+|1970-01-01T08:00:00.006+08:00|1970-01-01T08:00:00.010+08:00|                                  24.0|
++-----------------------------+-----------------------------+--------------------------------------+
 ```
\ No newline at end of file
diff --git a/docs/UserGuide/Query-Data/Overview.md b/docs/UserGuide/Query-Data/Overview.md
index ef8d57d84b..0d6a652507 100644
--- a/docs/UserGuide/Query-Data/Overview.md
+++ b/docs/UserGuide/Query-Data/Overview.md
@@ -33,10 +33,11 @@ SELECT [LAST] selectExpr [, selectExpr] ...
     [GROUP BY {
         ([startTime, endTime), interval [, slidingStep]) |
         LEVEL = levelNum [, levelNum] ... |
-        TAGS(tagKey [, tagKey] ... )
-        VARIATION(expression[,delta][,ignoreNull=true/false])|
-        CONDITION(expression,[keep>/>=/=/</<=]threshold[,ignoreNull=true/false])|
-        SESSION(timeInterval)
+        TAGS(tagKey [, tagKey] ... ) |
+        VARIATION(expression[,delta][,ignoreNull=true/false]) |
+        CONDITION(expression,[keep>/>=/=/</<=]threshold[,ignoreNull=true/false]) |
+        SESSION(timeInterval) |
+        COUNT(expression, size[,ignoreNull=true/false])
     }]
     [HAVING havingCondition]
     [ORDER BY sortKey {ASC | DESC}]
@@ -76,7 +77,7 @@ SELECT [LAST] selectExpr [, selectExpr] ...
 ### `GROUP BY` clause
 
 - The `GROUP BY` clause specifies how the time series are aggregated by segment or group.
-- Segmented aggregation refers to segmenting data in the row direction according to the time dimension, aiming at the time relationship between different data points in the same time series, and obtaining an aggregated value for each segment. Currently only **segmentation by time interval**、**group by variation**、**group by series** and **group by session** is supported, and more segmentation methods will be supported in the future.
+- Segmented aggregation refers to segmenting data in the row direction according to the time dimension, aiming at the time relationship between different data points in the same time series, and obtaining an aggregated value for each segment. Currently only **segmentation by time interval**、**group by variation**、**group by series**、**group by session** and **group by count** is supported, and more segmentation methods will be supported in the future.
 - Group aggregation refers to grouping the potential business attributes of time series for different time series. Each group contains several time series, and each group gets an aggregated value. Support **group by path level** and **group by tag** two grouping methods.
 - Segment aggregation and group aggregation can be mixed.
 - For details and examples, see the document [Group By Aggregation](./Group-By.md).
diff --git a/docs/zh/UserGuide/Query-Data/Group-By.md b/docs/zh/UserGuide/Query-Data/Group-By.md
index 68cfb25e53..8f8ba23af0 100644
--- a/docs/zh/UserGuide/Query-Data/Group-By.md
+++ b/docs/zh/UserGuide/Query-Data/Group-By.md
@@ -836,4 +836,70 @@ select __endTime,sum(hardware) from root.ln.wf02.wt01 group by session(50s) havi
 |1970-01-01T08:05:20.000+08:00|root.ln.wf02.wt01|1970-01-01T08:05:20.000+08:00|        550.0|
 |1970-01-02T08:08:01.000+08:00|root.ln.wf02.wt01|1970-01-02T08:08:05.000+08:00|       1650.0|
 +-----------------------------+-----------------+-----------------------------+-------------+
+```
+## 点数分段聚合
+`GROUP BY COUNT`可以根据点数分组进行聚合运算,将连续的指定数量数据点分为一组,即按照固定的点数进行分组。
+其语法定义如下:
+```sql
+group by count(controlExpression, size[,ignoreNull=true/false])
+```
+* controlExpression
+
+计数参照的对象,可以是结果集的任意列或是列的表达式
+
+* size
+
+一个组中数据点的数量,每`size`个数据点会被分到同一个组
+
+* ignoreNull=true/false
+
+是否忽略`controlExpression`为null的数据点,当ignoreNull为true时,在计数时会跳过`controlExpression`结果为null的数据点
+
+### 使用注意事项
+1. 对于一个分组,默认Time列输出分组的开始时间,查询时可以使用select `__endTime`的方式来使得结果输出分组的结束时间。
+2. 与`ALIGN BY DEVICE`搭配使用时会对每个device进行单独的分组操作。
+3. 当前暂不支持与`GROUP BY LEVEL`搭配使用。
+4. 当一个分组内最终的点数不满足`size`的数量时,不会输出该分组的结果
+
+对于下面的原始数据,给出几个查询样例。
+```
++-----------------------------+-----------+-----------------------+
+|                         Time|root.sg.soc|root.sg.charging_status|
++-----------------------------+-----------+-----------------------+
+|1970-01-01T08:00:00.001+08:00|       14.0|                      1|                                   
+|1970-01-01T08:00:00.002+08:00|       16.0|                      1|                                 
+|1970-01-01T08:00:00.003+08:00|       16.0|                      0|                                   
+|1970-01-01T08:00:00.004+08:00|       16.0|                      0|                                   
+|1970-01-01T08:00:00.005+08:00|       18.0|                      1|                                   
+|1970-01-01T08:00:00.006+08:00|       24.0|                      1|                                   
+|1970-01-01T08:00:00.007+08:00|       36.0|                      1|                                   
+|1970-01-01T08:00:00.008+08:00|       36.0|                   null|                                   
+|1970-01-01T08:00:00.009+08:00|       45.0|                      1|                                   
+|1970-01-01T08:00:00.010+08:00|       60.0|                      1|
++-----------------------------+-----------+-----------------------+
+```
+sql语句如下
+```sql
+select count(charging_stauts), first_value(soc) from root.sg group by count(charging_status,5) 
+```
+得到如下结果,其中由于第二个1970-01-01T08:00:00.006+08:00到1970-01-01T08:00:00.010+08:00的窗口中包含四个点,不符合`size = 5`的条件,因此不被输出
+```
++-----------------------------+-----------------------------+--------------------------------------+
+|                         Time|                    __endTime|first_value(root.sg.beijing.car01.soc)|
++-----------------------------+-----------------------------+--------------------------------------+
+|1970-01-01T08:00:00.001+08:00|1970-01-01T08:00:00.005+08:00|                                  14.0|
++-----------------------------+-----------------------------+--------------------------------------+
+```
+而当使用ignoreNull将null值也考虑进来时,可以得到两个点计数为5的窗口,sql如下
+```sql
+select count(charging_stauts), first_value(soc) from root.sg group by count(charging_status,5,ignoreNull=false) 
+```
+得到如下结果
+```
++-----------------------------+-----------------------------+--------------------------------------+
+|                         Time|                    __endTime|first_value(root.sg.beijing.car01.soc)|
++-----------------------------+-----------------------------+--------------------------------------+
+|1970-01-01T08:00:00.001+08:00|1970-01-01T08:00:00.005+08:00|                                  14.0|
+|1970-01-01T08:00:00.006+08:00|1970-01-01T08:00:00.010+08:00|                                  24.0|
++-----------------------------+-----------------------------+--------------------------------------+
 ```
\ No newline at end of file
diff --git a/docs/zh/UserGuide/Query-Data/Overview.md b/docs/zh/UserGuide/Query-Data/Overview.md
index a4cdcfd690..f99b3dfa42 100644
--- a/docs/zh/UserGuide/Query-Data/Overview.md
+++ b/docs/zh/UserGuide/Query-Data/Overview.md
@@ -35,9 +35,10 @@ SELECT [LAST] selectExpr [, selectExpr] ...
         ([startTime, endTime), interval [, slidingStep]) |
         LEVEL = levelNum [, levelNum] ... |
         TAGS(tagKey [, tagKey] ... |
-        VARIATION(expression[,delta][,ignoreNull=true/false])|
-        CONDITION(expression,[keep>/>=/=/</<=]threshold[,ignoreNull=true/false])|
-        SESSION(timeInterval)
+        VARIATION(expression[,delta][,ignoreNull=true/false]) |
+        CONDITION(expression,[keep>/>=/=/</<=]threshold[,ignoreNull=true/false]) |
+        SESSION(timeInterval) |
+        COUNT(expression, size[,ignoreNull=true/false])
     }]
     [HAVING havingCondition]
     [ORDER BY sortKey {ASC | DESC}]
@@ -77,7 +78,7 @@ SELECT [LAST] selectExpr [, selectExpr] ...
 #### `GROUP BY` 子句
 
 - `GROUP BY` 子句指定对序列进行分段或分组聚合的方式。
-- 分段聚合是指按照时间维度,针对同时间序列中不同数据点之间的时间关系,对数据在行的方向进行分段,每个段得到一个聚合值。目前支持**时间区间分段**、**差值分段**、**条件分段**和**会话分段**,未来将支持更多分段方式。
+- 分段聚合是指按照时间维度,针对同时间序列中不同数据点之间的时间关系,对数据在行的方向进行分段,每个段得到一个聚合值。目前支持**时间区间分段**、**差值分段**、**条件分段**、**会话分段**和**点数分段**,未来将支持更多分段方式。
 - 分组聚合是指针对不同时间序列,在时间序列的潜在业务属性上分组,每个组包含若干条时间序列,每个组得到一个聚合值。支持**按路径层级分组**和**按序列标签分组**两种分组方式。
 - 分段聚合和分组聚合可以混合使用。
 - 详细说明及示例见文档 [分段分组聚合](./Group-By.md) 。
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupByCountIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupByCountIT.java
new file mode 100644
index 0000000000..e8cafcd252
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupByCountIT.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it.groupby;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class IoTDBGroupByCountIT {
+  // the data can be viewed in
+  // https://docs.google.com/spreadsheets/d/1vsSmb41pdmK-BdBR1STwr8olg1Qc8baKVEWnfJB4mAg/edit#gid=0
+  private static final String[] SQLs =
+      new String[] {
+        "CREATE DATABASE root.sg.beijing.car01",
+        "CREATE TIMESERIES root.sg.beijing.car01.charging_status WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.sg.beijing.car01.soc WITH DATATYPE=INT64, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.sg.beijing.car01.vehicle_status WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(1, 1, 14, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(2, 1, 16, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(3, 0, 16, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(4, 0, 16, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(5, 1, 18, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(6, 1, 24, 1)",
+        "flush",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(7, 1, 36, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(8, null, 36, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(9, 1, 45, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(10, 1, 60, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(1100000000, null, 60, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(1200000000, null, 0, 0)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(1900000000, 1, 55, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(2000000000, 1, 70, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(2100000000, null, 70, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(2200000000, null, 70, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(2300000000, null, 69, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(2400000000, 1, 80, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(2500000000, 1, 100, 1)",
+        "INSERT INTO root.sg.beijing.car01(timestamp, charging_status, soc, vehicle_status) values(2600000000, 0, 101, 1)",
+        "flush"
+      };
+  private static final String[] SQLs2 =
+      new String[] {
+        "CREATE DATABASE root.sg.beijing.car02",
+        "CREATE TIMESERIES root.sg.beijing.car02.charging_status WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.sg.beijing.car02.soc WITH DATATYPE=INT64, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.sg.beijing.car02.vehicle_status WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(1, 1, 14, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(2, 1, 16, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(3, 0, 16, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(4, 0, 16, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(5, 1, 18, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(6, 1, 24, 1)",
+        "flush",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(7, 1, 36, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(8, null, 36, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(9, 1, 45, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(10, 1, 60, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(1100000000, null, 60, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(1200000000, null, 0, 0)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(1900000000, 1, 55, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(2000000000, 1, 70, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(2100000000, null, 70, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(2200000000, null, 70, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(2300000000, null, 69, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(2400000000, 1, 80, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(2500000000, 1, 100, 1)",
+        "INSERT INTO root.sg.beijing.car02(timestamp, charging_status, soc, vehicle_status) values(2600000000, 0, 101, 1)",
+        "flush"
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setEnableSeqSpaceCompaction(false)
+        .setEnableUnseqSpaceCompaction(false)
+        .setEnableCrossSpaceCompaction(false)
+        .setPartitionInterval(1000);
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareData(SQLs);
+    prepareData(SQLs2);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  private void checkHeader(ResultSetMetaData resultSetMetaData, String title) throws SQLException {
+    String[] headers = title.split(",");
+    for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+      assertEquals(headers[i - 1], resultSetMetaData.getColumnName(i));
+    }
+  }
+
+  private void normalTest(String[][] res, String sql, boolean hasEndTime) {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        String title =
+            hasEndTime
+                ? "Time,__endTime,sum(root.sg.beijing.car01.charging_status),count(root.sg.beijing.car01.vehicle_status),last_value(root.sg.beijing.car01.soc)"
+                : "Time,sum(root.sg.beijing.car01.charging_status),count(root.sg.beijing.car01.vehicle_status),last_value(root.sg.beijing.car01.soc)";
+        checkHeader(resultSetMetaData, title);
+        int base = hasEndTime ? 1 : 0;
+        int count = 0;
+        while (resultSet.next()) {
+          String startTime = resultSet.getString(1);
+          String sum = resultSet.getString(2 + base);
+          String countNum = resultSet.getString(3 + base);
+          String lastValue = resultSet.getString(4 + base);
+          assertEquals(res[count][0], startTime);
+          assertEquals(res[count][2], sum);
+          assertEquals(res[count][3], countNum);
+          assertEquals(res[count][4], lastValue);
+          if (hasEndTime) {
+            String endTime = resultSet.getString(2);
+            assertEquals(res[count][1], endTime);
+          }
+          count++;
+        }
+        assertEquals(res.length, count);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private void firstValueTest(String[][] res, String sql, boolean hasEndTime) {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        String title =
+            hasEndTime
+                ? "Time,__endTime,first_value(root.sg.beijing.car01.soc)"
+                : "Time,first_value(root.sg.beijing.car01.soc)";
+        checkHeader(resultSetMetaData, title);
+        int count = 0;
+        while (resultSet.next()) {
+          String startTime = resultSet.getString(1);
+          assertEquals(res[count][0], startTime);
+          if (hasEndTime) {
+            String endTime = resultSet.getString(2);
+            String first_value = resultSet.getString(3);
+            assertEquals(res[count][1], endTime);
+            assertEquals(res[count][2], first_value);
+          } else {
+            String first_value = resultSet.getString(2);
+            assertEquals(res[count][2], first_value);
+          }
+          count++;
+        }
+        assertEquals(res.length, count);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void groupByCountNormalTest1() {
+    String[][] res = {
+      {"1", "5", "3.0", "5", "18"},
+      {"6", "1900000000", "5.0", "5", "55"},
+    };
+    String sql =
+        "select sum(charging_status),count(vehicle_status),last_value(soc) from root.sg.beijing.car01 group by count(charging_status, 5)";
+    String sql2 =
+        "select __endTime,sum(charging_status),count(vehicle_status),last_value(soc) from root.sg.beijing.car01 group by count(charging_status, 5)";
+    normalTest(res, sql, false);
+    normalTest(res, sql2, true);
+  }
+
+  @Test
+  public void groupByCountNormalTest2() {
+    String[][] res = {{"1", "2400000000", "10.0", "12", "80"}};
+    String sql =
+        "select sum(charging_status),count(vehicle_status),last_value(soc) from root.sg.beijing.car01 group by count(charging_status, 12)";
+    String sql2 =
+        "select __endTime,sum(charging_status),count(vehicle_status),last_value(soc) from root.sg.beijing.car01 group by count(charging_status, 12)";
+    normalTest(res, sql, false);
+    normalTest(res, sql2, true);
+  }
+
+  @Test
+  public void groupByCountNormalTest3() {
+    String[][] res = {
+      {"1", "5", "14"},
+      {"6", "1900000000", "24"},
+    };
+    String sql =
+        "select first_value(soc) from root.sg.beijing.car01 group by count(charging_status, 5)";
+    String sql2 =
+        "select __endTime,first_value(soc) from root.sg.beijing.car01 group by count(charging_status, 5)";
+    firstValueTest(res, sql, false);
+    firstValueTest(res, sql2, true);
+  }
+
+  @Test
+  public void groupByCountNormalTest4() {
+    String[][] res = {
+      {"1", "7", "14"},
+      {"9", "2600000000", "45"},
+    };
+    String sql =
+        "select first_value(soc) from root.sg.beijing.car01 group by count(charging_status, 7)";
+    String sql2 =
+        "select __endTime,first_value(soc) from root.sg.beijing.car01 group by count(charging_status, 7)";
+    firstValueTest(res, sql, false);
+    firstValueTest(res, sql2, true);
+  }
+
+  @Test
+  public void groupByCountNormalTest5() {
+    String[][] res = {
+      {"1", "2", "14"},
+      {"3", "4", "16"},
+      {"5", "6", "18"},
+      {"7", "9", "36"},
+      {"10", "1900000000", "60"},
+      {"2000000000", "2400000000", "70"},
+      {"2500000000", "2600000000", "100"}
+    };
+    String sql =
+        "select first_value(soc) from root.sg.beijing.car01 group by count(charging_status, 2)";
+    String sql2 =
+        "select __endTime,first_value(soc) from root.sg.beijing.car01 group by count(charging_status, 2)";
+    firstValueTest(res, sql, false);
+    firstValueTest(res, sql2, true);
+  }
+
+  @Test
+  public void groupByCountNormalTest6() {
+    String[][] res = {
+      {"1", "2", "2.0", "2", "16"},
+      {"3", "4", "0.0", "2", "16"},
+      {"5", "6", "2.0", "2", "24"},
+      {"7", "9", "2.0", "2", "45"},
+      {"10", "1900000000", "2.0", "2", "55"},
+      {"2000000000", "2400000000", "2.0", "2", "80"},
+      {"2500000000", "2600000000", "1.0", "2", "101"}
+    };
+    String sql =
+        "select sum(charging_status),count(vehicle_status),last_value(soc) from root.sg.beijing.car01 group by count(charging_status, 2)";
+    String sql2 =
+        "select __endTime,sum(charging_status),count(vehicle_status),last_value(soc) from root.sg.beijing.car01 group by count(charging_status, 2)";
+    normalTest(res, sql, false);
+    normalTest(res, sql2, true);
+  }
+
+  @Test
+  public void groupByCountNormalTest7() {
+    String[][] res = {
+      {"3", "4", "0.0", "2", "16"},
+      {"2500000000", "2600000000", "1.0", "2", "101"}
+    };
+    String sql =
+        "select sum(charging_status),count(vehicle_status),last_value(soc) from root.sg.beijing.car01 group by count(charging_status, 2) having sum(charging_status)<2";
+    normalTest(res, sql, false);
+  }
+
+  @Test
+  public void groupByCountNormalTest8() {
+    String[][] res = {
+      {"1", "5", "3.0", "5", "18"},
+      {"6", "10", "4.0", "5", "60"},
+      {"1100000000", "2100000000", "2.0", "5", "70"},
+      {"2200000000", "2600000000", "2.0", "5", "101"},
+    };
+    String sql =
+        "select sum(charging_status),count(vehicle_status),last_value(soc) from root.sg.beijing.car01 group by count(charging_status, 5, ignoreNull=false)";
+    String sql2 =
+        "select __endTime,sum(charging_status),count(vehicle_status),last_value(soc) from root.sg.beijing.car01 group by count(charging_status, 5, ignoreNull=false)";
+    normalTest(res, sql, false);
+    normalTest(res, sql2, true);
+  }
+
+  @Test
+  public void groupByCountNormalTest9() {
+    String[][] res = {
+      {"1", "2", "14"},
+      {"3", "4", "16"},
+      {"5", "6", "18"},
+      {"7", "8", "36"},
+      {"9", "10", "45"},
+      {"1100000000", "1200000000", "60"},
+      {"1900000000", "2000000000", "55"},
+      {"2100000000", "2200000000", "70"},
+      {"2300000000", "2400000000", "69"},
+      {"2500000000", "2600000000", "100"}
+    };
+    String sql =
+        "select first_value(soc) from root.sg.beijing.car01 group by count(charging_status, 2, ignoreNull=false)";
+    String sql2 =
+        "select __endTime,first_value(soc) from root.sg.beijing.car01 group by count(charging_status, 2, ignoreNull=false)";
+    firstValueTest(res, sql, false);
+    firstValueTest(res, sql2, true);
+  }
+
+  @Test
+  public void groupByCountNormalTest10() {
+    String[][] res = {{"1", "1200000000", "7.0", "12", "0"}};
+    String sql =
+        "select sum(charging_status),count(vehicle_status),last_value(soc) from root.sg.beijing.car01 group by count(charging_status, 12, ignoreNull = false)";
+    String sql2 =
+        "select __endTime,sum(charging_status),count(vehicle_status),last_value(soc) from root.sg.beijing.car01 group by count(charging_status, 12, ignoreNull = false)";
+    normalTest(res, sql, false);
+    normalTest(res, sql2, true);
+  }
+
+  private void normalTestWithAlignByDevice(String[][] res, String sql, boolean hasEndTime) {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        String title =
+            hasEndTime
+                ? "Time,Device,__endTime,sum(charging_status),count(vehicle_status),last_value(soc)"
+                : "Time,Device,sum(charging_status),count(vehicle_status),last_value(soc)";
+        checkHeader(resultSetMetaData, title);
+        int count = 0;
+        String expectedDevice = "root.sg.beijing.car01";
+        int base = hasEndTime ? 1 : 0;
+        while (resultSet.next()) {
+          if (count == res.length) {
+            count = 0;
+            expectedDevice = "root.sg.beijing.car02";
+          }
+          String startTime = resultSet.getString(1);
+          String device = resultSet.getString(2);
+          String sum = resultSet.getString(3 + base);
+          String countNum = resultSet.getString(4 + base);
+          String lastValue = resultSet.getString(5 + base);
+          assertEquals(expectedDevice, device);
+          assertEquals(res[count][0], startTime);
+          assertEquals(res[count][2], sum);
+          assertEquals(res[count][3], countNum);
+          assertEquals(res[count][4], lastValue);
+          if (hasEndTime) {
+            String endTime = resultSet.getString(3);
+            assertEquals(res[count][1], endTime);
+          }
+          count++;
+        }
+        assertEquals(res.length, count);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void groupByCountAlignByDeviceTest() {
+    String[][] res = {
+      {"1", "2", "2.0", "2", "16"},
+      {"3", "4", "0.0", "2", "16"},
+      {"5", "6", "2.0", "2", "24"},
+      {"7", "9", "2.0", "2", "45"},
+      {"10", "1900000000", "2.0", "2", "55"},
+      {"2000000000", "2400000000", "2.0", "2", "80"},
+      {"2500000000", "2600000000", "1.0", "2", "101"}
+    };
+    String sql =
+        "select sum(charging_status),count(vehicle_status),last_value(soc) from root.** group by count(charging_status, 2) align by device";
+    String sql2 =
+        "select __endTime,sum(charging_status),count(vehicle_status),last_value(soc) from root.** group by count(charging_status, 2) align by device";
+    normalTestWithAlignByDevice(res, sql, false);
+    normalTestWithAlignByDevice(res, sql2, true);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/AbstractVariationWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/AbstractVariationWindow.java
index 5ce44233b7..e1ce2e32af 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/AbstractVariationWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/AbstractVariationWindow.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.aggregation.Accumulator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
@@ -47,11 +46,6 @@ public abstract class AbstractVariationWindow implements IWindow {
     return tsBlock.getColumn(controlColumnIndex);
   }
 
-  @Override
-  public boolean hasFinalResult(Accumulator accumulator) {
-    return accumulator.hasFinalResult();
-  }
-
   @Override
   public boolean contains(Column column) {
     return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindow.java
index 5ed29a80d1..dc8e8ca0dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindow.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.aggregation.Accumulator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
@@ -66,11 +65,6 @@ public class ConditionWindow implements IWindow {
     }
   }
 
-  @Override
-  public boolean hasFinalResult(Accumulator accumulator) {
-    return accumulator.hasFinalResult();
-  }
-
   @Override
   public boolean contains(Column column) {
     return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/AbstractVariationWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/CountWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/AbstractVariationWindow.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/CountWindow.java
index 5ce44233b7..5438768016 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/AbstractVariationWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/CountWindow.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,30 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.aggregation.Accumulator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
-public abstract class AbstractVariationWindow implements IWindow {
+public class CountWindow implements IWindow {
 
-  private final double delta;
   private final int controlColumnIndex;
-  private final boolean outputEndTime;
+  private final boolean needOutputEndTime;
   private final boolean ignoreNull;
-
-  protected long startTime;
-  protected long endTime;
-  protected boolean initializedHeadValue;
-  protected boolean valueIsNull = false;
-
-  protected AbstractVariationWindow(VariationWindowParameter variationWindowParameter) {
-    this.controlColumnIndex = variationWindowParameter.getControlColumnIndex();
-    this.ignoreNull = variationWindowParameter.isIgnoringNull();
-    this.outputEndTime = variationWindowParameter.isNeedOutputEndTime();
-    this.delta = variationWindowParameter.getDelta();
+  private final long countNumber;
+  private long startTime = Long.MAX_VALUE;
+  private long endTime = Long.MIN_VALUE;
+
+  private long leftCount;
+
+  public CountWindow(CountWindowParameter countWindowParameter) {
+    this.controlColumnIndex = countWindowParameter.getControlColumnIndex();
+    this.needOutputEndTime = countWindowParameter.isNeedOutputEndTime();
+    this.countNumber = countWindowParameter.getCountNumber();
+    this.ignoreNull = countWindowParameter.isIgnoreNull();
+    resetCurCount();
   }
 
   @Override
@@ -48,8 +46,16 @@ public abstract class AbstractVariationWindow implements IWindow {
   }
 
   @Override
-  public boolean hasFinalResult(Accumulator accumulator) {
-    return accumulator.hasFinalResult();
+  public boolean satisfy(Column column, int index) {
+    return leftCount != 0;
+  }
+
+  @Override
+  public void mergeOnePoint(Column[] timeAndValueColumn, int index) {
+    long currentTime = timeAndValueColumn[1].getLong(index);
+    startTime = Math.min(startTime, currentTime);
+    endTime = Math.max(endTime, currentTime);
+    leftCount--;
   }
 
   @Override
@@ -57,41 +63,39 @@ public abstract class AbstractVariationWindow implements IWindow {
     return false;
   }
 
-  public abstract void updatePreviousValue();
+  public boolean isNeedOutputEndTime() {
+    return needOutputEndTime;
+  }
 
-  public long getStartTime() {
-    return startTime;
+  public void resetCurCount() {
+    setLeftCount(countNumber);
   }
 
-  public void setStartTime(long startTime) {
-    this.startTime = startTime;
+  public long getStartTime() {
+    return startTime;
   }
 
   public long getEndTime() {
     return endTime;
   }
 
-  public void setEndTime(long endTime) {
-    this.endTime = endTime;
-  }
-
-  public void setInitializedHeadValue(boolean initializedHeadValue) {
-    this.initializedHeadValue = initializedHeadValue;
+  public long getLeftCount() {
+    return leftCount;
   }
 
-  public boolean ignoreNull() {
-    return ignoreNull;
+  public void setLeftCount(long leftCount) {
+    this.leftCount = leftCount;
   }
 
-  public boolean valueIsNull() {
-    return valueIsNull;
+  public void setEndTime(long endTime) {
+    this.endTime = endTime;
   }
 
-  public boolean isOutputEndTime() {
-    return outputEndTime;
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
   }
 
-  public double getDelta() {
-    return delta;
+  public boolean isIgnoreNull() {
+    return ignoreNull;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/CountWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/CountWindowManager.java
new file mode 100644
index 0000000000..61a6ae7669
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/CountWindowManager.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+import java.util.List;
+
+public class CountWindowManager implements IWindowManager {
+
+  private final CountWindow countWindow;
+
+  private boolean needSkip;
+  private boolean initialized;
+
+  public CountWindowManager(CountWindowParameter countWindowParameter) {
+    this.countWindow = new CountWindow(countWindowParameter);
+    this.needSkip = false;
+  }
+
+  @Override
+  public boolean isCurWindowInit() {
+    return this.initialized;
+  }
+
+  @Override
+  public void initCurWindow() {
+    this.initialized = true;
+    countWindow.resetCurCount();
+    countWindow.setStartTime(Long.MAX_VALUE);
+    countWindow.setEndTime(Long.MIN_VALUE);
+  }
+
+  @Override
+  public boolean hasNext(boolean hasMoreData) {
+    return hasMoreData;
+  }
+
+  @Override
+  public void next() {
+    this.needSkip = true;
+    this.initialized = false;
+  }
+
+  @Override
+  public IWindow getCurWindow() {
+    return countWindow;
+  }
+
+  @Override
+  public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+    if (!needSkip) {
+      return inputTsBlock;
+    }
+
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return inputTsBlock;
+    }
+
+    TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+    Column controlColumn = countWindow.getControlColumn(inputTsBlock);
+    long leftCount = countWindow.getLeftCount();
+    int i = 0, size = inputTsBlock.getPositionCount();
+
+    for (; i < size; i++) {
+      if (isIgnoringNull() && controlColumn.isNull(i)) continue;
+      // A Count Window has exactly the row number of countNumber
+      // if leftCount is zero, the window is finished.
+      if (leftCount == 0) break;
+      leftCount--;
+
+      long currentTime = timeColumn.getLong(i);
+      // judge whether we need update endTime
+      if (countWindow.getStartTime() > currentTime) {
+        countWindow.setStartTime(currentTime);
+      }
+      // judge whether we need update endTime
+      if (countWindow.getEndTime() < currentTime) {
+        countWindow.setEndTime(currentTime);
+      }
+    }
+
+    countWindow.setLeftCount(leftCount);
+
+    // we can create a new window beginning at index i of inputTsBlock
+    if (i < size) {
+      needSkip = false;
+    }
+    return inputTsBlock.subTsBlock(i);
+  }
+
+  @Override
+  public TsBlockBuilder createResultTsBlockBuilder(List<Aggregator> aggregators) {
+    List<TSDataType> dataTypes = getResultDataTypes(aggregators);
+    // Judge whether we need output endTime column.
+    if (countWindow.isNeedOutputEndTime()) {
+      dataTypes.add(0, TSDataType.INT64);
+    }
+    return new TsBlockBuilder(dataTypes);
+  }
+
+  @Override
+  public void appendAggregationResult(
+      TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+    if (countWindow.getLeftCount() != 0) return;
+    long endTime = countWindow.isNeedOutputEndTime() ? countWindow.getEndTime() : -1;
+    outputAggregators(aggregators, resultTsBlockBuilder, countWindow.getStartTime(), endTime);
+  }
+
+  @Override
+  public boolean needSkipInAdvance() {
+    return true;
+  }
+
+  // ignoreNull in CountWindow may be ambiguous.
+  @Override
+  public boolean isIgnoringNull() {
+    return countWindow.isIgnoreNull();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/CountWindowParameter.java
similarity index 53%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/CountWindowParameter.java
index acc12791da..43b34ec02b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/CountWindowParameter.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,22 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.db.mpp.execution.operator.window;
 
-public enum WindowType {
-  TIME_WINDOW((byte) 0),
-  VARIATION_WINDOW((byte) 1),
-  CONDITION_WINDOW((byte) 2),
-  SESSION_WINDOW((byte) 3);
+public class CountWindowParameter extends WindowParameter {
+  private final long countNumber;
+  private final int controlColumnIndex;
+  private final boolean ignoreNull;
+
+  public CountWindowParameter(
+      long countNumber, int controlColumnIndex, boolean needOutputEndTime, boolean ignoreNull) {
+    super(needOutputEndTime);
+    this.windowType = WindowType.COUNT_WINDOW;
+    this.countNumber = countNumber;
+    this.controlColumnIndex = controlColumnIndex;
+    this.ignoreNull = ignoreNull;
+  }
 
-  private final byte type;
+  public int getControlColumnIndex() {
+    return controlColumnIndex;
+  }
 
-  WindowType(byte type) {
-    this.type = type;
+  public long getCountNumber() {
+    return countNumber;
   }
 
-  public byte getType() {
-    return type;
+  public boolean isIgnoreNull() {
+    return ignoreNull;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java
index ee39f197c0..50fff68684 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.aggregation.Accumulator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
@@ -36,9 +35,9 @@ public interface IWindow {
   /**
    * Judge whether the point at index of column belongs to this window
    *
-   * @param column
-   * @param index
-   * @return
+   * @param column the controlColumn of window
+   * @param index the row index in column
+   * @return if the indexed row of column satisfy the window
    */
   boolean satisfy(Column column, int index);
 
@@ -48,19 +47,11 @@ public interface IWindow {
    */
   void mergeOnePoint(Column[] timeAndValueColumn, int index);
 
-  /**
-   * Used to customize whether the window has final aggregation result
-   *
-   * @param accumulator
-   * @return
-   */
-  boolean hasFinalResult(Accumulator accumulator);
-
   /**
    * Used to judge whether the window has contains the column
    *
-   * @param column
-   * @return
+   * @param column the controlColumn of window
+   * @return if the whole column satisfy the window
    */
   boolean contains(Column column);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindow.java
index 79b22dd858..7f3cea63d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindow.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.aggregation.Accumulator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
@@ -86,11 +85,6 @@ public class SessionWindow implements IWindow {
     setLastTsBlockTime(timeValue);
   }
 
-  @Override
-  public boolean hasFinalResult(Accumulator accumulator) {
-    return accumulator.hasFinalResult();
-  }
-
   @Override
   public boolean contains(Column column) {
     TimeColumn timeColumn = (TimeColumn) column;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java
index 1927316a24..4b07a584fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.aggregation.Accumulator;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -29,8 +28,6 @@ public class TimeWindow implements IWindow {
 
   private TimeRange curTimeRange;
 
-  public TimeWindow() {}
-
   public TimeWindow(TimeRange curTimeRange) {
     this.curTimeRange = curTimeRange;
   }
@@ -63,11 +60,6 @@ public class TimeWindow implements IWindow {
     // do nothing
   }
 
-  @Override
-  public boolean hasFinalResult(Accumulator accumulator) {
-    return accumulator.hasFinalResult();
-  }
-
   @Override
   public boolean contains(Column column) {
     TimeColumn timeColumn = (TimeColumn) column;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java
index 8e5df9a2d0..cfe5f52dce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java
@@ -40,6 +40,8 @@ public class WindowManagerFactory {
             windowParameter.isNeedOutputEndTime(),
             ((SessionWindowParameter) windowParameter).getTimeInterval(),
             ascending);
+      case COUNT_WINDOW:
+        return new CountWindowManager((CountWindowParameter) windowParameter);
       default:
         throw new IllegalArgumentException(
             "Not support this type of aggregation window :"
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
index acc12791da..5d16a7d65c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
@@ -23,7 +23,8 @@ public enum WindowType {
   TIME_WINDOW((byte) 0),
   VARIATION_WINDOW((byte) 1),
   CONDITION_WINDOW((byte) 2),
-  SESSION_WINDOW((byte) 3);
+  SESSION_WINDOW((byte) 3),
+  COUNT_WINDOW((byte) 4);
 
   private final byte type;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 33bb1aec79..a6bc60a90e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByConditionParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByCountParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupBySessionParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -80,6 +81,7 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByConditionComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.GroupByCountComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupBySessionComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByVariationComponent;
@@ -1233,6 +1235,13 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
           new GroupBySessionParameter(
               ((GroupBySessionComponent) groupByComponent).getTimeInterval());
       analysis.setGroupByParameter(groupByParameter);
+    } else if (windowType == WindowType.COUNT_WINDOW) {
+      GroupByParameter groupByParameter =
+          new GroupByCountParameter(
+              ((GroupByCountComponent) groupByComponent).getCountNumber(),
+              groupByComponent.isIgnoringNull());
+      analysis.setGroupByParameter(groupByParameter);
+      analysis.setDeviceToGroupByExpression(deviceToGroupByExpression);
     } else {
       throw new SemanticException("Unsupported window type");
     }
@@ -1284,6 +1293,14 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       long interval = ((GroupBySessionComponent) groupByComponent).getTimeInterval();
       GroupByParameter groupByParameter = new GroupBySessionParameter(interval);
       analysis.setGroupByParameter(groupByParameter);
+    } else if (windowType == WindowType.COUNT_WINDOW) {
+      GroupByParameter groupByParameter =
+          new GroupByCountParameter(
+              ((GroupByCountComponent) groupByComponent).getCountNumber(),
+              groupByComponent.isIgnoringNull());
+      analyzeExpression(analysis, groupByExpression);
+      analysis.setGroupByExpression(groupByExpression);
+      analysis.setGroupByParameter(groupByParameter);
     } else {
       throw new SemanticException("Unsupported window type");
     }
@@ -1307,8 +1324,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     }
 
     // check keep Expression
-    if (keepExpression instanceof ConstantOperand) {
-    } else if (keepExpression instanceof CompareBinaryExpression) {
+    if (keepExpression instanceof CompareBinaryExpression) {
       Expression leftExpression = ((CompareBinaryExpression) keepExpression).getLeftExpression();
       Expression rightExpression = ((CompareBinaryExpression) keepExpression).getRightExpression();
       if (!(leftExpression instanceof TimeSeriesOperand
@@ -1319,7 +1335,9 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
                 "Please check the keep condition ([%s]),it need to be a constant or a compare expression constructed by 'keep' and a long number.",
                 keepExpression.getExpressionString()));
       }
-    } else {
+      return;
+    }
+    if (!(keepExpression instanceof ConstantOperand)) {
       throw new SemanticException(
           String.format(
               "Please check the keep condition ([%s]),it need to be a constant or a compare expression constructed by 'keep' and a long number.",
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 811b377e7d..580ed76379 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -72,6 +72,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
 import org.apache.iotdb.db.mpp.plan.statement.component.FromComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByConditionComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.GroupByCountComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByLevelComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupBySessionComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTagComponent;
@@ -1010,6 +1011,15 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
           groupByKeys.add("COMMON");
           queryStatement.setGroupByComponent(
               parseGroupByClause(groupByAttribute, WindowType.SESSION_WINDOW));
+        } else if (groupByAttribute.COUNT() != null) {
+          if (groupByKeys.contains("COMMON")) {
+            throw new SemanticException(GROUP_BY_COMMON_ONLY_ONE_MSG);
+          }
+
+          groupByKeys.add("COMMON");
+          queryStatement.setGroupByComponent(
+              parseGroupByClause(groupByAttribute, WindowType.COUNT_WINDOW));
+
         } else {
           throw new SemanticException("Unknown GROUP BY type.");
         }
@@ -1259,6 +1269,14 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
     } else if (windowType == WindowType.SESSION_WINDOW) {
       long interval = DateTimeUtils.convertDurationStrToLong(ctx.timeInterval.getText());
       return new GroupBySessionComponent(interval);
+    } else if (windowType == WindowType.COUNT_WINDOW) {
+      ExpressionContext countExpressionContext = expressions.get(0);
+      long countNumber = Long.parseLong(ctx.countNumber.getText());
+      GroupByCountComponent groupByCountComponent = new GroupByCountComponent(countNumber);
+      groupByCountComponent.setControlColumnExpression(
+          parseExpression(countExpressionContext, true));
+      groupByCountComponent.setIgnoringNull(ignoringNull);
+      return groupByCountComponent;
     } else {
       throw new SemanticException("Unsupported window type");
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 5ea018928a..0e556f45bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -124,6 +124,7 @@ import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOp
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ShowQueriesOperator;
 import org.apache.iotdb.db.mpp.execution.operator.window.ConditionWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.CountWindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.SessionWindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.VariationWindowParameter;
@@ -190,6 +191,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregatio
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByConditionParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByCountParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupBySessionParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -1502,6 +1504,21 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                     ((GroupBySessionParameter) groupByParameter).getTimeInterval(),
                     node.isOutputEndTime());
             break;
+          case COUNT_WINDOW:
+            Expression groupByCountExpression = node.getGroupByExpression();
+            if (groupByCountExpression == null) {
+              throw new IllegalArgumentException("groupByCountExpression can't be null");
+            }
+            windowParameter =
+                new CountWindowParameter(
+                    ((GroupByCountParameter) groupByParameter).getCountNumber(),
+                    layout
+                        .get(groupByCountExpression.getExpressionString())
+                        .get(0)
+                        .getValueColumnIndex(),
+                    node.isOutputEndTime(),
+                    ((GroupByCountParameter) groupByParameter).isIgnoreNull());
+            break;
           default:
             throw new IllegalArgumentException("Unsupported window type");
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByCountParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByCountParameter.java
new file mode 100644
index 0000000000..c565cafdd0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByCountParameter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.planner.plan.parameter;
+
+import org.apache.iotdb.db.mpp.execution.operator.window.WindowType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class GroupByCountParameter extends GroupByParameter {
+
+  private final long countNumber;
+  private final boolean ignoreNull;
+
+  public GroupByCountParameter(long countNumber, boolean ignoreNull) {
+    super(WindowType.COUNT_WINDOW);
+    this.countNumber = countNumber;
+    this.ignoreNull = ignoreNull;
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(countNumber, byteBuffer);
+    ReadWriteIOUtils.write(ignoreNull, byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(countNumber, stream);
+    ReadWriteIOUtils.write(ignoreNull, stream);
+  }
+
+  public long getCountNumber() {
+    return countNumber;
+  }
+
+  public boolean isIgnoreNull() {
+    return ignoreNull;
+  }
+
+  public static GroupByParameter deserialize(ByteBuffer buffer) {
+    long countNumber = ReadWriteIOUtils.readLong(buffer);
+    boolean ignoreNull = ReadWriteIOUtils.readBool(buffer);
+    return new GroupByCountParameter(countNumber, ignoreNull);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    if (!super.equals(obj)) {
+      return false;
+    }
+    return this.countNumber == ((GroupByCountParameter) obj).getCountNumber()
+        || this.ignoreNull == ((GroupByCountParameter) obj).isIgnoreNull();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), countNumber, ignoreNull);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByCountComponent.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByCountComponent.java
index acc12791da..7b7e32300c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByCountComponent.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,22 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.db.mpp.plan.statement.component;
 
-package org.apache.iotdb.db.mpp.execution.operator.window;
+import org.apache.iotdb.db.mpp.execution.operator.window.WindowType;
 
-public enum WindowType {
-  TIME_WINDOW((byte) 0),
-  VARIATION_WINDOW((byte) 1),
-  CONDITION_WINDOW((byte) 2),
-  SESSION_WINDOW((byte) 3);
+public class GroupByCountComponent extends GroupByComponent {
+  private final long countNumber;
 
-  private final byte type;
-
-  WindowType(byte type) {
-    this.type = type;
+  public GroupByCountComponent(long countNumber) {
+    super(WindowType.COUNT_WINDOW);
+    this.countNumber = countNumber;
   }
 
-  public byte getType() {
-    return type;
+  public long getCountNumber() {
+    return countNumber;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
index 2be3358e20..f2fffba941 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
@@ -303,8 +303,12 @@ public class QueryStatement extends Statement {
         && groupByComponent.getWindowType() == WindowType.CONDITION_WINDOW;
   }
 
+  private boolean isGroupByCount() {
+    return groupByComponent != null && groupByComponent.getWindowType() == WindowType.COUNT_WINDOW;
+  }
+
   public boolean hasGroupByExpression() {
-    return isGroupByVariation() || isGroupByCondition();
+    return isGroupByVariation() || isGroupByCondition() || isGroupByCount();
   }
 
   public boolean isAlignByTime() {