You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/12 06:21:35 UTC

[iotdb] branch master updated: [IOTDB-3784] tag aggregation: basic support (#6661)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 71ba686386 [IOTDB-3784] tag aggregation: basic support (#6661)
71ba686386 is described below

commit 71ba686386a7a6832cb6233ab8342141cdbbe29f
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Wed Oct 12 14:21:28 2022 +0800

    [IOTDB-3784] tag aggregation: basic support (#6661)
---
 .../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 |   2 +-
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |   9 +-
 docs/UserGuide/Query-Data/Aggregate-Query.md       | 174 +++++++
 docs/zh/UserGuide/Query-Data/Aggregate-Query.md    | 168 ++++++-
 .../db/it/aggregation/IoTDBTagAggregationIT.java   | 511 +++++++++++++++++++++
 .../apache/iotdb/commons/path/MeasurementPath.java |  35 ++
 .../schemaregion/rocksdb/RSchemaRegion.java        | 119 ++---
 .../iotdb/db/metadata/LocalSchemaProcessor.java    |   6 +-
 .../db/metadata/cache/DataNodeSchemaCache.java     |   3 +
 .../iotdb/db/metadata/cache/SchemaCacheEntry.java  |  11 +-
 .../iotdb/db/metadata/mtree/IMTreeBelowSG.java     |   5 +-
 .../db/metadata/mtree/MTreeBelowSGCachedImpl.java  |  15 +-
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  |  32 +-
 .../db/metadata/schemaregion/ISchemaRegion.java    |  10 +-
 .../schemaregion/SchemaRegionMemoryImpl.java       |  66 ++-
 .../schemaregion/SchemaRegionSchemaFileImpl.java   |  59 ++-
 .../apache/iotdb/db/metadata/tag/TagManager.java   |  15 +
 .../mpp/common/schematree/ClusterSchemaTree.java   |   9 +-
 .../schematree/node/SchemaMeasurementNode.java     |  14 +
 .../visitor/SchemaTreeMeasurementVisitor.java      |   1 +
 .../operator/process/TagAggregationOperator.java   | 233 ++++++++++
 .../operator/schema/SchemaFetchScanOperator.java   |   7 +-
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  48 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  98 +++-
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  |  13 +-
 .../db/mpp/plan/analyze/ExpressionAnalyzer.java    |   2 +
 .../iotdb/db/mpp/plan/analyze/ISchemaFetcher.java  |   2 +
 .../mpp/plan/analyze/StandaloneSchemaFetcher.java  |  12 +-
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  37 ++
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    | 143 +++++-
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  13 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  77 +++-
 .../db/mpp/plan/planner/SubPlanTypeExtractor.java  |   9 +
 .../planner/distribution/ExchangeNodeAdder.java    |   5 +
 .../plan/planner/distribution/SourceRewriter.java  |  91 +++-
 .../plan/planner/plan/node/PlanGraphPrinter.java   |  31 ++
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |   6 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   5 +
 .../node/metedata/read/SchemaFetchScanNode.java    |  19 +-
 .../plan/node/process/GroupByLevelNode.java        |  21 +-
 .../planner/plan/node/process/GroupByTagNode.java  | 312 +++++++++++++
 .../plan/parameter/AggregationDescriptor.java      |   4 +-
 ....java => CrossSeriesAggregationDescriptor.java} |  16 +-
 .../statement/component/GroupByTagComponent.java   |  38 ++
 .../db/mpp/plan/statement/crud/QueryStatement.java |  22 +
 .../statement/internal/SchemaFetchStatement.java   |   9 +-
 .../qp/strategy/optimizer/ConcatPathOptimizer.java |   3 +-
 .../apache/iotdb/db/qp/utils/WildcardsRemover.java |   8 +-
 .../db/metadata/cache/DataNodeSchemaCacheTest.java |  22 +-
 .../iotdb/db/metadata/mtree/MTreeBelowSGTest.java  |  16 +-
 .../db/metadata/path/MeasurementPathTest.java      |  75 ++-
 .../mpp/execution/operator/OperatorMemoryTest.java |   7 +-
 .../schema/SchemaFetchScanOperatorTest.java        |   3 +-
 .../plan/analyze/AggregationDescriptorTest.java    |  19 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeTest.java     |   2 +-
 .../mpp/plan/analyze/FakePartitionFetcherImpl.java |   0
 .../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java |   9 +
 .../db/mpp/plan/parser/StatementGeneratorTest.java |  16 +
 .../iotdb/db/mpp/plan/plan/LogicalPlannerTest.java |  93 +++-
 .../db/mpp/plan/plan/QueryLogicalPlanUtil.java     |  20 +-
 .../distribution/AggregationDistributionTest.java  |  24 +-
 .../metadata/read/SchemaFetchScanNodeTest.java     |   7 +-
 .../node/process/GroupByLevelNodeSerdeTest.java    |   4 +-
 .../plan/node/process/GroupByTagNodeSerdeTest.java | 122 +++++
 .../iotdb/tsfile/utils/ReadWriteIOUtils.java       |   2 +-
 65 files changed, 2754 insertions(+), 235 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index ff766af5d2..11c5ad6685 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -178,4 +178,4 @@ keyWords
     | WITH
     | WITHOUT
     | PRIVILEGE_VALUE
-    ;
\ No newline at end of file
+    ;
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 87c880e7bd..6edc1f5771 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -394,6 +394,7 @@ specialClause
     | groupByTimeClause havingClause? orderByClause? specialLimit? #groupByTimeStatement
     | groupByFillClause havingClause? orderByClause? specialLimit? #groupByFillStatement
     | groupByLevelClause havingClause? orderByClause? specialLimit? #groupByLevelStatement
+    | groupByTagClause orderByClause? #groupByTagStatement
     | fillClause orderByClause? specialLimit? #fillStatement
     ;
 
@@ -440,6 +441,8 @@ groupByTimeClause
     : GROUP BY LR_BRACKET timeRange COMMA DURATION_LITERAL (COMMA DURATION_LITERAL)? fillClause? RR_BRACKET
     | GROUP BY LR_BRACKET timeRange COMMA DURATION_LITERAL (COMMA DURATION_LITERAL)? RR_BRACKET
     COMMA LEVEL operator_eq INTEGER_LITERAL (COMMA INTEGER_LITERAL)* fillClause?
+    | GROUP BY LR_BRACKET timeRange COMMA DURATION_LITERAL (COMMA DURATION_LITERAL)? RR_BRACKET
+    COMMA TAGS LR_BRACKET identifier (COMMA identifier)* RR_BRACKET
     ;
 
 groupByFillClause
@@ -451,6 +454,10 @@ groupByLevelClause
     : GROUP BY LEVEL operator_eq INTEGER_LITERAL (COMMA INTEGER_LITERAL)* fillClause?
     ;
 
+groupByTagClause
+    : GROUP BY TAGS LR_BRACKET identifier (COMMA identifier)* RR_BRACKET
+    ;
+
 fillClause
     : FILL LR_BRACKET (linearClause | previousClause | specificValueClause | previousUntilLastClause | oldTypeClause (COMMA oldTypeClause)*) RR_BRACKET
     ;
@@ -957,4 +964,4 @@ slimitClause
 
 soffsetClause
     : SOFFSET INTEGER_LITERAL
-    ;
\ No newline at end of file
+    ;
diff --git a/docs/UserGuide/Query-Data/Aggregate-Query.md b/docs/UserGuide/Query-Data/Aggregate-Query.md
index 3cc502b4c4..b23d21ef38 100644
--- a/docs/UserGuide/Query-Data/Aggregate-Query.md
+++ b/docs/UserGuide/Query-Data/Aggregate-Query.md
@@ -454,6 +454,180 @@ Total line number = 7
 It costs 0.004s
 ```
 
+## Aggregation By Tags
+
+IotDB allows you to do aggregation query with the tags defined in timeseries through `GROUP BY TAGS` clause as well.
+
+Firstly, we can put these example data into IoTDB, which will be used in the following feature introduction.
+
+These are the temperature data of the workshops, which belongs to the factory `factory1` and locates in different cities. The time range is `[1000, 10000)`.
+
+The device node of the timeseries path is the ID of the device. The information of city and workshop are modelled in the tags `city` and `workshop`.
+The devices `d1` and `d2` belong to the workshop `d1` in `Beijing`.
+`d3` and `d4` belong to the workshop `w2` in `Beijing`.
+`d5` and `d6` belong to the workshop `w1` in `Shanghai`.
+`d7` belongs to the workshop `w2` in `Shanghai`.
+`d8` and `d9` are under maintenance, and don't belong to any workshops, so they have no tags.
+
+
+```SQL
+set storage group to root.factory1;
+create timeseries root.factory1.d1.temperature with datatype=FLOAT tags(city=Beijing, workshop=w1);
+create timeseries root.factory1.d2.temperature with datatype=FLOAT tags(city=Beijing, workshop=w1);
+create timeseries root.factory1.d3.temperature with datatype=FLOAT tags(city=Beijing, workshop=w2);
+create timeseries root.factory1.d4.temperature with datatype=FLOAT tags(city=Beijing, workshop=w2);
+create timeseries root.factory1.d5.temperature with datatype=FLOAT tags(city=Shanghai, workshop=w1);
+create timeseries root.factory1.d6.temperature with datatype=FLOAT tags(city=Shanghai, workshop=w1);
+create timeseries root.factory1.d7.temperature with datatype=FLOAT tags(city=Shanghai, workshop=w2);
+create timeseries root.factory1.d8.temperature with datatype=FLOAT;
+create timeseries root.factory1.d9.temperature with datatype=FLOAT;
+
+insert into root.factory1.d1(time, temperature) values(1000, 104.0);
+insert into root.factory1.d1(time, temperature) values(3000, 104.2);
+insert into root.factory1.d1(time, temperature) values(5000, 103.3);
+insert into root.factory1.d1(time, temperature) values(7000, 104.1);
+
+insert into root.factory1.d2(time, temperature) values(1000, 104.4);
+insert into root.factory1.d2(time, temperature) values(3000, 103.7);
+insert into root.factory1.d2(time, temperature) values(5000, 103.3);
+insert into root.factory1.d2(time, temperature) values(7000, 102.9);
+
+insert into root.factory1.d3(time, temperature) values(1000, 103.9);
+insert into root.factory1.d3(time, temperature) values(3000, 103.8);
+insert into root.factory1.d3(time, temperature) values(5000, 102.7);
+insert into root.factory1.d3(time, temperature) values(7000, 106.9);
+
+insert into root.factory1.d4(time, temperature) values(1000, 103.9);
+insert into root.factory1.d4(time, temperature) values(5000, 102.7);
+insert into root.factory1.d4(time, temperature) values(7000, 106.9);
+
+insert into root.factory1.d5(time, temperature) values(1000, 112.9);
+insert into root.factory1.d5(time, temperature) values(7000, 113.0);
+
+insert into root.factory1.d6(time, temperature) values(1000, 113.9);
+insert into root.factory1.d6(time, temperature) values(3000, 113.3);
+insert into root.factory1.d6(time, temperature) values(5000, 112.7);
+insert into root.factory1.d6(time, temperature) values(7000, 112.3);
+
+insert into root.factory1.d7(time, temperature) values(1000, 101.2);
+insert into root.factory1.d7(time, temperature) values(3000, 99.3);
+insert into root.factory1.d7(time, temperature) values(5000, 100.1);
+insert into root.factory1.d7(time, temperature) values(7000, 99.8);
+
+insert into root.factory1.d8(time, temperature) values(1000, 50.0);
+insert into root.factory1.d8(time, temperature) values(3000, 52.1);
+insert into root.factory1.d8(time, temperature) values(5000, 50.1);
+insert into root.factory1.d8(time, temperature) values(7000, 50.5);
+
+insert into root.factory1.d9(time, temperature) values(1000, 50.3);
+insert into root.factory1.d9(time, temperature) values(3000, 52.1);
+```
+
+### Aggregation query by one single tag
+
+If the user wants to know the average temperature of each workshop, he can query like this
+
+```SQL
+SELECT AVG(temperature) FROM root.factory1.** GROUP BY TAGS(city);
+```
+
+The query will calculate the average of the temperatures of those timeseries which have the same tag value of the key `city`.
+The results are
+
+```
++--------+------------------+
+|    city|  avg(temperature)|
++--------+------------------+
+| Beijing|104.04666697184244|
+|Shanghai|107.85000076293946|
+|    NULL| 50.84999910990397|
++--------+------------------+
+Total line number = 3
+It costs 0.231s
+```
+
+From the results we can see that the differences between aggregation by tags query and aggregation by time or level query are:
+1. Aggregation query by tags will no longer remove wildcard to raw timeseries, but do the aggregation through the data of multiple timeseries, which have the same tag value.
+2. Except for the aggregate result column, the result set contains the key-value column of the grouped tag. The column name is the tag key, and the values in the column are tag values which present in the searched timeseries.
+If some searched timeseries doesn't have the grouped tag, a `NULL` value in the key-value column of the grouped tag will be presented, which means the aggregation of all the timeseries lacking the tagged key.
+
+### Aggregation query by multiple tags
+
+Except for the aggregation query by one single tag, aggregation query by multiple tags in a particular order is allowed as well.
+
+For example, a user wants to know the average temperature of the devices in each workshop. 
+As the workshop names may be same in different city, it's not correct to aggregated by the tag `workshop` directly.
+So the aggregation by the tag `city` should be done first, and then by the tag `workshop`.
+
+SQL
+
+```SQL
+SELECT avg(temperature) FROM root.factory1.** GROUP BY TAGS(city, workshop);
+```
+
+The results
+
+```
++--------+--------+------------------+
+|    city|workshop|  avg(temperature)|
++--------+--------+------------------+
+|    NULL|    NULL| 50.84999910990397|
+|Shanghai|      w1|113.01666768391927|
+| Beijing|      w2| 104.4000004359654|
+|Shanghai|      w2|100.10000038146973|
+| Beijing|      w1|103.73750019073486|
++--------+--------+------------------+
+Total line number = 5
+It costs 0.027s
+```
+
+We can see that in a multiple tags aggregation query, the result set will output the key-value columns of all the grouped tag keys, which have the same order with the one in `GROUP BY TAGS`.
+
+### Downsampling Aggregation by tags based on Time Window
+
+Downsampling aggregation by time window is one of the most popular features in a time series database. IoTDB supports to do aggregation query by tags based on time window.
+
+For example, a user wants to know the average temperature of the devices in each workshop, in every 5 seconds, in the range of time `[1000, 10000)`.
+
+SQL
+
+```SQL
+SELECT avg(temperature) FROM root.factory1.** GROUP BY ([1000, 10000), 5s), TAGS(city, workshop);
+```
+
+The results
+
+```
++-----------------------------+--------+--------+------------------+
+|                         Time|    city|workshop|  avg(temperature)|
++-----------------------------+--------+--------+------------------+
+|1970-01-01T08:00:01.000+08:00|    NULL|    NULL| 50.91999893188476|
+|1970-01-01T08:00:01.000+08:00|Shanghai|      w1|113.20000076293945|
+|1970-01-01T08:00:01.000+08:00| Beijing|      w2|             103.4|
+|1970-01-01T08:00:01.000+08:00|Shanghai|      w2| 100.1999994913737|
+|1970-01-01T08:00:01.000+08:00| Beijing|      w1|103.81666692097981|
+|1970-01-01T08:00:06.000+08:00|    NULL|    NULL|              50.5|
+|1970-01-01T08:00:06.000+08:00|Shanghai|      w1| 112.6500015258789|
+|1970-01-01T08:00:06.000+08:00| Beijing|      w2| 106.9000015258789|
+|1970-01-01T08:00:06.000+08:00|Shanghai|      w2| 99.80000305175781|
+|1970-01-01T08:00:06.000+08:00| Beijing|      w1|             103.5|
++-----------------------------+--------+--------+------------------+
+```
+
+Comparing to the pure tag aggregations, this kind of aggregation will divide the data according to the time window specification firstly, and do the aggregation query by the multiple tags in each time window secondly.
+The result set will also contain a time column, which have the same meaning with the time column of the result in downsampling aggregation query by time window.
+
+### Limitation of Aggregation by Tags
+
+As this feature is still under development, some queries have not been completed yet and will be supported in the future.
+
+> 1. Temporarily not support `HAVING` clause to filter the results.
+> 2. Temporarily not support ordering by tag values.
+> 3. Temporarily not support `LIMIT`,`OFFSET`,`SLIMIT`,`SOFFSET`.
+> 4. Temporarily not support `ALIGN BY DEVICE`.
+> 5. Temporarily not support expressions as aggregation function parameter,e.g. `count(s+1)`.
+> 6. Not support the value filter, which stands the same with the `GROUP BY LEVEL` query.
+
 ## Aggregate result filtering
 
 If you want to filter the results of aggregate queries, 
diff --git a/docs/zh/UserGuide/Query-Data/Aggregate-Query.md b/docs/zh/UserGuide/Query-Data/Aggregate-Query.md
index 12e34e0e16..db44805857 100644
--- a/docs/zh/UserGuide/Query-Data/Aggregate-Query.md
+++ b/docs/zh/UserGuide/Query-Data/Aggregate-Query.md
@@ -448,9 +448,175 @@ Total line number = 7
 It costs 0.004s
 ```
 
+## 标签聚合查询
+
+IoTDB 还支持通过 `GROUP BY TAGS` 语句根据时间序列中定义的标签的键值做聚合查询。
+
+我们先在 IoTDB 中写入如下示例数据,稍后会以这些数据为例介绍标签聚合查询。
+
+这些是某工厂 `factory1` 在多个城市的多个车间的设备温度数据, 时间范围为 [1000, 10000)。
+
+时间序列路径中的设备一级是设备唯一标识。城市信息 `city` 和车间信息 `workshop` 则被建模在该设备时间序列的标签中。
+其中,设备 `d1`、`d2` 在 `Beijing` 的 `w1` 车间, `d3`、`d4` 在 `Beijing` 的 `w2` 车间,`d5`、`d6` 在 `Shanghai` 的 `w1` 车间,`d7` 在 `Shanghai` 的 `w2` 车间。
+`d8` 和 `d9` 设备目前处于调试阶段,还未被分配到具体的城市和车间,所以其相应的标签值为空值。
+
+```SQL
+set storage group to root.factory1;
+create timeseries root.factory1.d1.temperature with datatype=FLOAT tags(city=Beijing, workshop=w1);
+create timeseries root.factory1.d2.temperature with datatype=FLOAT tags(city=Beijing, workshop=w1);
+create timeseries root.factory1.d3.temperature with datatype=FLOAT tags(city=Beijing, workshop=w2);
+create timeseries root.factory1.d4.temperature with datatype=FLOAT tags(city=Beijing, workshop=w2);
+create timeseries root.factory1.d5.temperature with datatype=FLOAT tags(city=Shanghai, workshop=w1);
+create timeseries root.factory1.d6.temperature with datatype=FLOAT tags(city=Shanghai, workshop=w1);
+create timeseries root.factory1.d7.temperature with datatype=FLOAT tags(city=Shanghai, workshop=w2);
+create timeseries root.factory1.d8.temperature with datatype=FLOAT;
+create timeseries root.factory1.d9.temperature with datatype=FLOAT;
+
+insert into root.factory1.d1(time, temperature) values(1000, 104.0);
+insert into root.factory1.d1(time, temperature) values(3000, 104.2);
+insert into root.factory1.d1(time, temperature) values(5000, 103.3);
+insert into root.factory1.d1(time, temperature) values(7000, 104.1);
+
+insert into root.factory1.d2(time, temperature) values(1000, 104.4);
+insert into root.factory1.d2(time, temperature) values(3000, 103.7);
+insert into root.factory1.d2(time, temperature) values(5000, 103.3);
+insert into root.factory1.d2(time, temperature) values(7000, 102.9);
+
+insert into root.factory1.d3(time, temperature) values(1000, 103.9);
+insert into root.factory1.d3(time, temperature) values(3000, 103.8);
+insert into root.factory1.d3(time, temperature) values(5000, 102.7);
+insert into root.factory1.d3(time, temperature) values(7000, 106.9);
+
+insert into root.factory1.d4(time, temperature) values(1000, 103.9);
+insert into root.factory1.d4(time, temperature) values(5000, 102.7);
+insert into root.factory1.d4(time, temperature) values(7000, 106.9);
+
+insert into root.factory1.d5(time, temperature) values(1000, 112.9);
+insert into root.factory1.d5(time, temperature) values(7000, 113.0);
+
+insert into root.factory1.d6(time, temperature) values(1000, 113.9);
+insert into root.factory1.d6(time, temperature) values(3000, 113.3);
+insert into root.factory1.d6(time, temperature) values(5000, 112.7);
+insert into root.factory1.d6(time, temperature) values(7000, 112.3);
+
+insert into root.factory1.d7(time, temperature) values(1000, 101.2);
+insert into root.factory1.d7(time, temperature) values(3000, 99.3);
+insert into root.factory1.d7(time, temperature) values(5000, 100.1);
+insert into root.factory1.d7(time, temperature) values(7000, 99.8);
+
+insert into root.factory1.d8(time, temperature) values(1000, 50.0);
+insert into root.factory1.d8(time, temperature) values(3000, 52.1);
+insert into root.factory1.d8(time, temperature) values(5000, 50.1);
+insert into root.factory1.d8(time, temperature) values(7000, 50.5);
+
+insert into root.factory1.d9(time, temperature) values(1000, 50.3);
+insert into root.factory1.d9(time, temperature) values(3000, 52.1);
+```
+
+### 单标签聚合查询
+
+用户想统计该工厂每个地区的设备的温度的平均值,可以使用如下查询语句
+
+```SQL
+SELECT AVG(temperature) FROM root.factory1.** GROUP BY TAGS(city);
+```
+
+该查询会将具有同一个 `city` 标签值的时间序列的所有满足查询条件的点做平均值计算,计算结果如下
+
+```
++--------+------------------+
+|    city|  avg(temperature)|
++--------+------------------+
+| Beijing|104.04666697184244|
+|Shanghai|107.85000076293946|
+|    NULL| 50.84999910990397|
++--------+------------------+
+Total line number = 3
+It costs 0.231s
+```
+
+从结果集中可以看到,和时间区间聚合、按层次聚合相比,标签聚合的查询结果的不同点是:
+1. 标签聚合查询的聚合结果不会再做去星号展开,而是将多个时间序列的数据作为一个整体进行聚合计算。
+2. 标签聚合查询除了输出聚合结果列,还会输出聚合标签的键值列。该列的列名为聚合指定的标签键,列的值则为所有查询的时间序列中出现的该标签的值。
+如果某些时间序列未设置该标签,则在键值列中有一行单独的 `NULL` ,代表未设置标签的所有时间序列数据的聚合结果。
+
+### 多标签聚合查询
+
+除了基本的单标签聚合查询外,还可以按顺序指定多个标签进行聚合计算。
+
+例如,用户想统计每个城市的每个车间内设备的平均温度。但因为各个城市的车间名称有可能相同,所以不能直接按照 `workshop` 做标签聚合。必须要先按照城市,再按照车间处理。
+
+SQL 语句如下
+
+```SQL
+SELECT avg(temperature) FROM root.factory1.** GROUP BY TAGS(city, workshop);
+```
+
+查询结果如下
+
+```
++--------+--------+------------------+
+|    city|workshop|  avg(temperature)|
++--------+--------+------------------+
+|    NULL|    NULL| 50.84999910990397|
+|Shanghai|      w1|113.01666768391927|
+| Beijing|      w2| 104.4000004359654|
+|Shanghai|      w2|100.10000038146973|
+| Beijing|      w1|103.73750019073486|
++--------+--------+------------------+
+Total line number = 5
+It costs 0.027s
+```
+
+从结果集中可以看到,和单标签聚合相比,多标签聚合的查询结果会根据指定的标签顺序,输出相应标签的键值列。
+
+### 基于时间区间的标签聚合查询
+
+按照时间区间聚合是时序数据库中最常用的查询需求之一。IoTDB 在基于时间区间的聚合基础上,支持进一步按照标签进行聚合查询。
+
+例如,用户想统计时间 `[1000, 10000)` 范围内,每个城市每个车间中的设备每 5 秒内的平均温度。
+
+SQL 语句如下
+
+```SQL
+SELECT AVG(temperature) FROM root.factory1.** GROUP BY ([1000, 10000), 5s), TAGS(city, workshop);
+```
+
+查询结果如下
+
+```
++-----------------------------+--------+--------+------------------+
+|                         Time|    city|workshop|  avg(temperature)|
++-----------------------------+--------+--------+------------------+
+|1970-01-01T08:00:01.000+08:00|    NULL|    NULL| 50.91999893188476|
+|1970-01-01T08:00:01.000+08:00|Shanghai|      w1|113.20000076293945|
+|1970-01-01T08:00:01.000+08:00| Beijing|      w2|             103.4|
+|1970-01-01T08:00:01.000+08:00|Shanghai|      w2| 100.1999994913737|
+|1970-01-01T08:00:01.000+08:00| Beijing|      w1|103.81666692097981|
+|1970-01-01T08:00:06.000+08:00|    NULL|    NULL|              50.5|
+|1970-01-01T08:00:06.000+08:00|Shanghai|      w1| 112.6500015258789|
+|1970-01-01T08:00:06.000+08:00| Beijing|      w2| 106.9000015258789|
+|1970-01-01T08:00:06.000+08:00|Shanghai|      w2| 99.80000305175781|
+|1970-01-01T08:00:06.000+08:00| Beijing|      w1|             103.5|
++-----------------------------+--------+--------+------------------+
+```
+
+和标签聚合相比,基于时间区间的标签聚合的查询会首先按照时间区间划定聚合范围,在时间区间内部再根据指定的标签顺序,进行相应数据的聚合计算。在输出的结果集中,会包含一列时间列,该时间列值的含义和时间区间聚合查询的相同。
+
+### 标签聚合查询的限制
+
+由于标签聚合功能仍然处于开发阶段,目前有如下未实现功能。
+
+> 1. 暂不支持 `HAVING` 子句过滤查询结果。
+> 2. 暂不支持结果按照标签值排序。
+> 3. 暂不支持 `LIMIT`,`OFFSET`,`SLIMIT`,`SOFFSET`。
+> 4. 暂不支持 `ALIGN BY DEVICE`。
+> 5. 暂不支持聚合函数内部包含表达式,例如 `count(s+1)`。
+> 6. 不支持值过滤条件聚合,和分层聚合查询行为保持一致。
+
 ## 聚合结果过滤
 
-如果想对聚合查询的结果进行过滤,可以在`GROUP BY`子句之后使用`HAVING`子句
+如果想对聚合查询的结果进行过滤,可以在 `GROUP BY` 子句之后使用 `HAVING` 子句
 
 > 注意:
 > 
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java
new file mode 100644
index 0000000000..c3a6989a1b
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aggregation;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.itbase.category.ClusterIT;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.fail;
+
+@Category({ClusterIT.class})
+public class IoTDBTagAggregationIT {
+  private static final String[] DATASET =
+      new String[] {
+        "set storage group to root.sg.a;",
+        "set storage group to root.sg.b;",
+        "set storage group to root.sg2.c;",
+        "create timeseries root.sg.a.d1.t with datatype=FLOAT tags(k1=k1v1, k2=k2v1, k3=k3v1);",
+        "create timeseries root.sg.b.d2.t with datatype=FLOAT tags(k1=k1v1, k2=k2v2);",
+        "create timeseries root.sg.a.d3.t with datatype=FLOAT tags(k1=k1v2, k2=k2v1);",
+        "create timeseries root.sg.b.d4.t with datatype=FLOAT tags(k1=k1v2, k2=k2v2);",
+        "create timeseries root.sg.a.d5.t with datatype=FLOAT tags(k1=k1v1);",
+        "create timeseries root.sg.b.d6.t with datatype=FLOAT tags(k2=k2v1);",
+        "create timeseries root.sg.a.d7.t with datatype=FLOAT;",
+        "create timeseries root.sg2.c.d8.t with datatype=TEXT tags(k3=k3v1);",
+        "insert into root.sg.a.d1(time, t) values(1, 1.1);",
+        "insert into root.sg.b.d2(time, t) values(1, 1.2);",
+        "insert into root.sg.a.d3(time, t) values(1, 1.3);",
+        "insert into root.sg.b.d4(time, t) values(1, 1.4);",
+        "insert into root.sg.a.d5(time, t) values(1, 1.5);",
+        "insert into root.sg.b.d6(time, t) values(1, 1.6);",
+        "insert into root.sg.a.d7(time, t) values(1, 1.7);",
+        "insert into root.sg2.c.d8(time, t) values(1, 'abc');",
+        "insert into root.sg.a.d1(time, t) values(10, 2.1);",
+        "insert into root.sg.b.d2(time, t) values(10, 3.2);",
+        "insert into root.sg.a.d3(time, t) values(10, 4.3);",
+        "insert into root.sg.b.d4(time, t) values(10, 5.4);",
+        "insert into root.sg.a.d5(time, t) values(10, 6.5);",
+        "insert into root.sg.b.d6(time, t) values(10, 7.6);",
+        "insert into root.sg.a.d7(time, t) values(10, 8.7);"
+      };
+
+  private static final double DELTA = 0.001D;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initBeforeClass();
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (String sql : DATASET) {
+        statement.execute(sql);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+  }
+
+  @Test
+  public void testAggregateFunctions() {
+    String query =
+        "SELECT COUNT(t), AVG(t), MAX_TIME(t), MIN_TIME(t), MAX_VALUE(t), MIN_VALUE(t), EXTREME(t) FROM root.sg.** GROUP BY TAGS(k1)";
+    // Expected result set:
+    // +----+--------+------------------+-----------+-----------+------------+------------+----------+
+    // |  k1|count(t)|
+    // avg(t)|max_time(t)|min_time(t)|max_value(t)|min_value(t)|extreme(t)|
+    // +----+--------+------------------+-----------+-----------+------------+------------+----------+
+    // |k1v2|       4|3.1000000536441803|         10|          1|         5.4|         1.3|
+    // 5.4|
+    // |k1v1|       6| 2.600000003973643|         10|          1|         6.5|         1.1|
+    // 6.5|
+    // |NULL|       4|  4.89999994635582|         10|          1|         8.7|         1.6|
+    // 8.7|
+    // +----+--------+------------------+-----------+-----------+------------+------------+----------+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try (ResultSet resultSet = statement.executeQuery(query)) {
+        Assert.assertEquals(8, resultSet.getMetaData().getColumnCount());
+        Assert.assertTrue(resultSet.next());
+        Assert.assertEquals("k1v2", resultSet.getString("k1"));
+        Assert.assertEquals(4L, resultSet.getLong("count(t)"));
+        Assert.assertEquals(3.1D, resultSet.getDouble("avg(t)"), DELTA);
+        Assert.assertEquals(10L, resultSet.getLong("max_time(t)"));
+        Assert.assertEquals(1L, resultSet.getLong("min_time(t)"));
+        Assert.assertEquals(5.4F, resultSet.getFloat("max_value(t)"), DELTA);
+        Assert.assertEquals(1.3F, resultSet.getFloat("min_value(t)"), DELTA);
+        Assert.assertEquals(5.4F, resultSet.getFloat("extreme(t)"), DELTA);
+        Assert.assertTrue(resultSet.next());
+        Assert.assertEquals("k1v1", resultSet.getString(1));
+        Assert.assertEquals(6L, resultSet.getLong(2));
+        Assert.assertEquals(2.6D, resultSet.getDouble(3), DELTA);
+        Assert.assertEquals(10L, resultSet.getLong(4));
+        Assert.assertEquals(1L, resultSet.getLong(5));
+        Assert.assertEquals(6.5F, resultSet.getFloat(6), DELTA);
+        Assert.assertEquals(1.1F, resultSet.getFloat(7), DELTA);
+        Assert.assertEquals(6.5F, resultSet.getFloat(8), DELTA);
+        Assert.assertTrue(resultSet.next());
+        Assert.assertEquals("NULL", resultSet.getString(1));
+        Assert.assertEquals(4L, resultSet.getLong(2));
+        Assert.assertEquals(4.9D, resultSet.getDouble(3), DELTA);
+        Assert.assertEquals(10L, resultSet.getLong(4));
+        Assert.assertEquals(1L, resultSet.getLong(5));
+        Assert.assertEquals(8.7D, resultSet.getFloat(6), DELTA);
+        Assert.assertEquals(1.6D, resultSet.getFloat(7), DELTA);
+        Assert.assertEquals(8.7D, resultSet.getFloat(8), DELTA);
+        Assert.assertFalse(resultSet.next());
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  @Ignore
+  public void testAggregateFunctionsWithNestedExpression() {
+    String query = "SELECT COUNT(t + 1), AVG(t + 1) FROM root.sg.** GROUP BY TAGS(k1)";
+    // Expected result set:
+    // +----+------------+------------------+
+    // |  k1|count(t + 1)|        avg(t + 1)|
+    // +----+------------+------------------+
+    // |k1v2|           4|3.1000000536441803|
+    // |k1v1|           6| 3.600000003973643|
+    // |NULL|           4|  5.89999994635582|
+    // +----+------------+------------------+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try (ResultSet resultSet = statement.executeQuery(query)) {
+        Assert.assertEquals(8, resultSet.getMetaData().getColumnCount());
+        Assert.assertTrue(resultSet.next());
+        Assert.assertEquals("k1v2", resultSet.getString("k1"));
+        Assert.assertEquals(4L, resultSet.getLong("count(t)"));
+        Assert.assertEquals(3.1D, resultSet.getDouble("avg(t)"), DELTA);
+        Assert.assertEquals(10L, resultSet.getLong("max_time(t)"));
+        Assert.assertEquals(1L, resultSet.getLong("min_time(t)"));
+        Assert.assertEquals(5.4F, resultSet.getFloat("max_value(t)"), DELTA);
+        Assert.assertEquals(1.3F, resultSet.getFloat("min_value(t)"), DELTA);
+        Assert.assertEquals(5.4F, resultSet.getFloat("extreme(t)"), DELTA);
+        Assert.assertTrue(resultSet.next());
+        Assert.assertEquals("k1v1", resultSet.getString(1));
+        Assert.assertEquals(6L, resultSet.getLong(2));
+        Assert.assertEquals(2.6D, resultSet.getDouble(3), DELTA);
+        Assert.assertEquals(10L, resultSet.getLong(4));
+        Assert.assertEquals(1L, resultSet.getLong(5));
+        Assert.assertEquals(6.5F, resultSet.getFloat(6), DELTA);
+        Assert.assertEquals(1.1F, resultSet.getFloat(7), DELTA);
+        Assert.assertEquals(6.5F, resultSet.getFloat(8), DELTA);
+        Assert.assertTrue(resultSet.next());
+        Assert.assertEquals("NULL", resultSet.getString(1));
+        Assert.assertEquals(4L, resultSet.getLong(2));
+        Assert.assertEquals(4.9D, resultSet.getDouble(3), DELTA);
+        Assert.assertEquals(10L, resultSet.getLong(4));
+        Assert.assertEquals(1L, resultSet.getLong(5));
+        Assert.assertEquals(8.7D, resultSet.getFloat(6), DELTA);
+        Assert.assertEquals(1.6D, resultSet.getFloat(7), DELTA);
+        Assert.assertEquals(8.7D, resultSet.getFloat(8), DELTA);
+        Assert.assertFalse(resultSet.next());
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  @Ignore // TODO: support having in later commits
+  public void testAggregateFunctionsWithHaving() {
+    String query =
+        "SELECT COUNT(t), AVG(t), MAX_TIME(t), MIN_TIME(t), MAX_VALUE(t), MIN_VALUE(t), EXTREME(t) FROM root.sg.** GROUP BY TAGS(k1) HAVING avg(t) > 3";
+    // Expected result set:
+    // +----+--------+------------------+-----------+-----------+------------+------------+----------+
+    // |  k1|count(t)|
+    // avg(t)|max_time(t)|min_time(t)|max_value(t)|min_value(t)|extreme(t)|
+    // +----+--------+------------------+-----------+-----------+------------+------------+----------+
+    // |k1v2|       4|3.1000000536441803|         10|          1|         5.4|         1.3|
+    // 5.4|
+    // |NULL|       4|  4.89999994635582|         10|          1|         8.7|         1.6|
+    // 8.7|
+    // +----+--------+------------------+-----------+-----------+------------+------------+----------+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try (ResultSet resultSet = statement.executeQuery(query)) {
+        Assert.assertEquals(8, resultSet.getMetaData().getColumnCount());
+        Assert.assertTrue(resultSet.next());
+        Assert.assertEquals("k1v2", resultSet.getString("k1"));
+        Assert.assertEquals(4L, resultSet.getLong("count(t)"));
+        Assert.assertEquals(3.1D, resultSet.getDouble("avg(t)"), DELTA);
+        Assert.assertEquals(10L, resultSet.getLong("max_time(t)"));
+        Assert.assertEquals(1L, resultSet.getLong("min_time(t)"));
+        Assert.assertEquals(5.4F, resultSet.getFloat("max_value(t)"), DELTA);
+        Assert.assertEquals(1.3F, resultSet.getFloat("min_value(t)"), DELTA);
+        Assert.assertEquals(5.4F, resultSet.getFloat("extreme(t)"), DELTA);
+        Assert.assertTrue(resultSet.next());
+        Assert.assertEquals("NULL", resultSet.getString(1));
+        Assert.assertEquals(4L, resultSet.getLong(2));
+        Assert.assertEquals(4.9D, resultSet.getDouble(3), DELTA);
+        Assert.assertEquals(10L, resultSet.getLong(4));
+        Assert.assertEquals(1L, resultSet.getLong(5));
+        Assert.assertEquals(8.7D, resultSet.getFloat(6), DELTA);
+        Assert.assertEquals(1.6D, resultSet.getFloat(7), DELTA);
+        Assert.assertEquals(8.7D, resultSet.getFloat(8), DELTA);
+        Assert.assertFalse(resultSet.next());
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testMultipleAggregationKeys() {
+    String query = "SELECT COUNT(t) FROM root.sg.** GROUP BY TAGS(k1, k2)";
+    // Expected result set:
+    // +----+----+--------+
+    // |  k1|  k2|count(t)|
+    // +----+----+--------+
+    // |NULL|NULL|       2|
+    // |NULL|k2v1|       2|
+    // |k1v1|NULL|       2|
+    // |k1v2|k2v1|       2|
+    // |k1v1|k2v2|       2|
+    // |k1v1|k2v1|       2|
+    // |k1v2|k2v2|       2|
+    // +----+----+--------+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try (ResultSet resultSet = statement.executeQuery(query)) {
+        Assert.assertEquals(3, resultSet.getMetaData().getColumnCount());
+        Set<List<String>> groups = new HashSet<>();
+        for (int i = 0; i < 7; i++) {
+          Assert.assertTrue(resultSet.next());
+          List<String> tagValues = new ArrayList<>(2);
+          tagValues.add(resultSet.getString("k1"));
+          tagValues.add(resultSet.getString("k2"));
+          groups.add(tagValues);
+          Assert.assertEquals(2L, resultSet.getLong("count(t)"));
+        }
+        Assert.assertFalse(resultSet.next());
+        Assert.assertEquals(7, groups.size());
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testAlongWithTimeAggregation() {
+    String query = "SELECT COUNT(t) from root.sg.** GROUP BY ([0, 20), 10ms), TAGS(k1)";
+    // Expected result set:
+    // +-----------------------------+----+--------+
+    // |                         Time|  k1|count(t)|
+    // +-----------------------------+----+--------+
+    // |1970-01-01T08:00:00.000+08:00|k1v2|       2|
+    // |1970-01-01T08:00:00.000+08:00|k1v1|       3|
+    // |1970-01-01T08:00:00.000+08:00|NULL|       2|
+    // |1970-01-01T08:00:00.010+08:00|k1v2|       2|
+    // |1970-01-01T08:00:00.010+08:00|k1v1|       3|
+    // |1970-01-01T08:00:00.010+08:00|NULL|       2|
+    // +-----------------------------+----+--------+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try (ResultSet resultSet = statement.executeQuery(query)) {
+        Assert.assertEquals(3, resultSet.getMetaData().getColumnCount());
+        Set<String> groups = new HashSet<>();
+        for (int i = 0; i < 6; i++) {
+          Assert.assertTrue(resultSet.next());
+          if (i < 3) {
+            Assert.assertEquals(0L, resultSet.getLong("Time"));
+          } else {
+            Assert.assertEquals(10L, resultSet.getLong("Time"));
+          }
+          String tagValue = resultSet.getString("k1");
+          switch (tagValue) {
+            case "k1v1":
+              Assert.assertEquals(3L, resultSet.getLong("count(t)"));
+              break;
+            case "k1v2":
+              Assert.assertEquals(2L, resultSet.getLong("count(t)"));
+              break;
+            case "NULL":
+              Assert.assertEquals(2L, resultSet.getLong("count(t)"));
+              break;
+            default:
+              fail("Unexpected tag value: " + tagValue);
+          }
+          groups.add(tagValue);
+        }
+        Assert.assertEquals(3, groups.size());
+        Assert.assertFalse(resultSet.next());
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testAlongWithSlidingWindow() {
+    String query = "SELECT COUNT(t) from root.sg.** GROUP BY ([0, 20), 15ms, 5ms), TAGS(k1)";
+    // Expected result set:
+    // +-----------------------------+----+--------+
+    // |                         Time|  k1|count(t)|
+    // +-----------------------------+----+--------+
+    // |1970-01-01T08:00:00.000+08:00|k1v2|       4|
+    // |1970-01-01T08:00:00.000+08:00|k1v1|       6|
+    // |1970-01-01T08:00:00.000+08:00|NULL|       4|
+    // |1970-01-01T08:00:00.005+08:00|k1v2|       2|
+    // |1970-01-01T08:00:00.005+08:00|k1v1|       3|
+    // |1970-01-01T08:00:00.005+08:00|NULL|       2|
+    // |1970-01-01T08:00:00.010+08:00|k1v2|       2|
+    // |1970-01-01T08:00:00.010+08:00|k1v1|       3|
+    // |1970-01-01T08:00:00.010+08:00|NULL|       2|
+    // |1970-01-01T08:00:00.015+08:00|k1v2|       0|
+    // |1970-01-01T08:00:00.015+08:00|k1v1|       0|
+    // |1970-01-01T08:00:00.015+08:00|NULL|       0|
+    // +-----------------------------+----+--------+
+    long[][] expectedValue = new long[][] {{4L, 6L, 4L}, {2L, 3L, 2L}, {2L, 3L, 2L}, {0L, 0L, 0L}};
+    long[] expectedTime = new long[] {0L, 5L, 10L, 15L};
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try (ResultSet resultSet = statement.executeQuery(query)) {
+        Assert.assertEquals(3, resultSet.getMetaData().getColumnCount());
+        for (int i = 0; i < 4; i++) {
+          for (int j = 0; j < 3; j++) {
+            Assert.assertTrue(resultSet.next());
+            String tagValue = resultSet.getString("k1");
+            switch (tagValue) {
+              case "k1v2":
+                Assert.assertEquals(expectedTime[i], resultSet.getLong("Time"));
+                Assert.assertEquals(expectedValue[i][0], resultSet.getLong("count(t)"));
+                break;
+              case "k1v1":
+                Assert.assertEquals(expectedValue[i][1], resultSet.getLong("count(t)"));
+                break;
+              case "NULL":
+                Assert.assertEquals(expectedValue[i][2], resultSet.getLong("count(t)"));
+                break;
+              default:
+                fail("Unexpected tag value: " + tagValue);
+            }
+          }
+        }
+        Assert.assertFalse(resultSet.next());
+      }
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testAlongWithTimeAggregationAndOrdering() {
+    String query =
+        "SELECT COUNT(t) from root.sg.** GROUP BY ([0, 20), 10ms), TAGS(k1) ORDER BY TIME DESC";
+    // Expected result set:
+    // +-----------------------------+----+--------+
+    // |                         Time|  k1|count(t)|
+    // +-----------------------------+----+--------+
+    // |1970-01-01T08:00:00.010+08:00|k1v2|       2|
+    // |1970-01-01T08:00:00.010+08:00|k1v1|       3|
+    // |1970-01-01T08:00:00.010+08:00|NULL|       2|
+    // |1970-01-01T08:00:00.000+08:00|k1v2|       2|
+    // |1970-01-01T08:00:00.000+08:00|k1v1|       3|
+    // |1970-01-01T08:00:00.000+08:00|NULL|       2|
+    // +-----------------------------+----+--------+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try (ResultSet resultSet = statement.executeQuery(query)) {
+        Assert.assertEquals(3, resultSet.getMetaData().getColumnCount());
+        Set<String> groups = new HashSet<>();
+        for (int i = 0; i < 6; i++) {
+          Assert.assertTrue(resultSet.next());
+          if (i < 3) {
+            Assert.assertEquals(10L, resultSet.getLong("Time"));
+          } else {
+            Assert.assertEquals(0L, resultSet.getLong("Time"));
+          }
+          String tagValue = resultSet.getString("k1");
+          switch (tagValue) {
+            case "k1v1":
+              Assert.assertEquals(3L, resultSet.getLong("count(t)"));
+              break;
+            case "k1v2":
+              Assert.assertEquals(2L, resultSet.getLong("count(t)"));
+              break;
+            case "NULL":
+              Assert.assertEquals(2L, resultSet.getLong("count(t)"));
+              break;
+            default:
+              fail("Unexpected tag value: " + tagValue);
+          }
+          groups.add(tagValue);
+        }
+        Assert.assertEquals(3, groups.size());
+        Assert.assertFalse(resultSet.next());
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testAlongWithTimeFiltering() {
+    String query = "SELECT COUNT(t) FROM root.sg.** WHERE time > 1 GROUP BY TAGS(k1)";
+    // Expected result set:
+    // +----+--------+
+    // |  k1|count(t)|
+    // +----+--------+
+    // |k1v2|       2|
+    // |k1v1|       3|
+    // |NULL|       2|
+    // +----+--------+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try (ResultSet resultSet = statement.executeQuery(query)) {
+        Assert.assertEquals(2, resultSet.getMetaData().getColumnCount());
+        Set<String> groups = new HashSet<>();
+        for (int i = 0; i < 3; i++) {
+          Assert.assertTrue(resultSet.next());
+          String tagValue = resultSet.getString("k1");
+          switch (tagValue) {
+            case "k1v1":
+              Assert.assertEquals(3L, resultSet.getLong("count(t)"));
+              break;
+            case "k1v2":
+              Assert.assertEquals(2L, resultSet.getLong("count(t)"));
+              break;
+            case "NULL":
+              Assert.assertEquals(2L, resultSet.getLong("count(t)"));
+              break;
+            default:
+              fail("Unexpected tag value: " + tagValue);
+          }
+          groups.add(tagValue);
+        }
+        Assert.assertEquals(3, groups.size());
+        Assert.assertFalse(resultSet.next());
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testIncompatibleMixedDataTypes() {
+    String query = "SELECT AVG(t) FROM root.** GROUP BY TAGS(k3)";
+    // AVG() with numeric and text timeseries, an exception will be thrown
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try (ResultSet ignored = statement.executeQuery(query)) {
+        Assert.fail();
+      }
+    } catch (SQLException e) {
+      Assert.assertTrue(e.getMessage().contains("only support numeric data types"));
+    }
+  }
+
+  @Test
+  public void testWithValueFilters() {
+    String query = "SELECT AVG(t) FROM root.sg.** WHERE t > 1.5 GROUP BY TAGS(k1)";
+    // Value filter is not supported yet
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try (ResultSet ignored = statement.executeQuery(query)) {
+        Assert.fail();
+      }
+    } catch (SQLException e) {
+      Assert.assertTrue(
+          e.getMessage().contains("Only time filters are supported in GROUP BY TAGS query"));
+    }
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java b/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
index ec57075d68..bdb6b2f89c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
@@ -35,6 +35,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
 
 public class MeasurementPath extends PartialPath {
 
@@ -42,6 +44,8 @@ public class MeasurementPath extends PartialPath {
 
   private IMeasurementSchema measurementSchema;
 
+  private Map<String, String> tagMap;
+
   private boolean isUnderAlignedEntity = false;
 
   // alias of measurement, null pointer cannot be serialized in thrift so empty string is instead
@@ -91,6 +95,10 @@ public class MeasurementPath extends PartialPath {
     return measurementSchema;
   }
 
+  public Map<String, String> getTagMap() {
+    return tagMap;
+  }
+
   @Override
   public TSDataType getSeriesType() {
     return getMeasurementSchema().getType();
@@ -104,6 +112,10 @@ public class MeasurementPath extends PartialPath {
     this.measurementSchema = measurementSchema;
   }
 
+  public void setTagMap(Map<String, String> tagMap) {
+    this.tagMap = tagMap;
+  }
+
   @Override
   public String getMeasurementAlias() {
     return measurementAlias;
@@ -149,6 +161,9 @@ public class MeasurementPath extends PartialPath {
     result.device = device;
     result.measurementAlias = measurementAlias;
     result.measurementSchema = measurementSchema;
+    if (tagMap != null) {
+      result.tagMap = new HashMap<>(tagMap);
+    }
     result.isUnderAlignedEntity = isUnderAlignedEntity;
     return result;
   }
@@ -168,6 +183,10 @@ public class MeasurementPath extends PartialPath {
       newMeasurementPath =
           new MeasurementPath(this.getDevice(), this.getMeasurement(), this.getMeasurementSchema());
       newMeasurementPath.setUnderAlignedEntity(this.isUnderAlignedEntity);
+      newMeasurementPath.setMeasurementAlias(this.measurementAlias);
+      if (tagMap != null) {
+        newMeasurementPath.setTagMap(new HashMap<>(tagMap));
+      }
     } catch (IllegalPathException e) {
       logger.warn("path is illegal: {}", this.getFullPath(), e);
     }
@@ -189,6 +208,12 @@ public class MeasurementPath extends PartialPath {
       }
       measurementSchema.serializeTo(byteBuffer);
     }
+    if (tagMap == null) {
+      ReadWriteIOUtils.write((byte) 0, byteBuffer);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, byteBuffer);
+      ReadWriteIOUtils.write(tagMap, byteBuffer);
+    }
     ReadWriteIOUtils.write(isUnderAlignedEntity, byteBuffer);
     ReadWriteIOUtils.write(measurementAlias, byteBuffer);
   }
@@ -208,6 +233,12 @@ public class MeasurementPath extends PartialPath {
       }
       measurementSchema.serializeTo(stream);
     }
+    if (tagMap == null) {
+      ReadWriteIOUtils.write((byte) 0, stream);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, stream);
+      ReadWriteIOUtils.write(tagMap, stream);
+    }
     ReadWriteIOUtils.write(isUnderAlignedEntity, stream);
     ReadWriteIOUtils.write(measurementAlias, stream);
   }
@@ -224,6 +255,10 @@ public class MeasurementPath extends PartialPath {
         measurementPath.measurementSchema = VectorMeasurementSchema.deserializeFrom(byteBuffer);
       }
     }
+    isNull = ReadWriteIOUtils.readByte(byteBuffer);
+    if (isNull == 1) {
+      measurementPath.tagMap = ReadWriteIOUtils.readMap(byteBuffer);
+    }
     measurementPath.isUnderAlignedEntity = ReadWriteIOUtils.readBool(byteBuffer);
     measurementPath.measurementAlias = ReadWriteIOUtils.readString(byteBuffer);
     measurementPath.nodes = partialPath.getNodes();
diff --git a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 153132a0a7..a39e42217f 100644
--- a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -118,6 +118,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ALL_NODE_TYPE_ARRAY;
 import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
@@ -648,29 +649,28 @@ public class RSchemaRegion implements ISchemaRegion {
           }
           Set<IMNode> tempSet = ConcurrentHashMap.newKeySet();
 
-          parentNeedsToCheck
-              .parallelStream()
-              .forEach(
-                  currentNode -> {
-                    if (!currentNode.isStorageGroup()) {
-                      PartialPath parentPath = currentNode.getPartialPath();
-                      int level = parentPath.getNodeLength();
-                      int end = parentPath.getNodeLength() - 1;
-                      if (!readWriteHandler.existAnySiblings(
-                          RSchemaUtils.getLevelPathPrefix(parentPath.getNodes(), end, level))) {
-                        try {
-                          readWriteHandler.deleteNode(
-                              parentPath.getNodes(), RSchemaUtils.typeOfMNode(currentNode));
-                          IMNode parentNode = currentNode.getParent();
-                          if (!parentNode.isStorageGroup()) {
-                            tempSet.add(currentNode.getParent());
-                          }
-                        } catch (Exception e) {
-                          logger.warn("delete {} fail.", parentPath.getFullPath(), e);
-                        }
+          Stream<IMNode> parentStream = parentNeedsToCheck.parallelStream();
+          parentStream.forEach(
+              currentNode -> {
+                if (!currentNode.isStorageGroup()) {
+                  PartialPath parentPath = currentNode.getPartialPath();
+                  int level = parentPath.getNodeLength();
+                  int end = parentPath.getNodeLength() - 1;
+                  if (!readWriteHandler.existAnySiblings(
+                      RSchemaUtils.getLevelPathPrefix(parentPath.getNodes(), end, level))) {
+                    try {
+                      readWriteHandler.deleteNode(
+                          parentPath.getNodes(), RSchemaUtils.typeOfMNode(currentNode));
+                      IMNode parentNode = currentNode.getParent();
+                      if (!parentNode.isStorageGroup()) {
+                        tempSet.add(currentNode.getParent());
                       }
+                    } catch (Exception e) {
+                      logger.warn("delete {} fail.", parentPath.getFullPath(), e);
                     }
-                  });
+                  }
+                }
+              });
           parentNeedsToCheck.clear();
           parentNeedsToCheck.addAll(tempSet);
         }
@@ -768,35 +768,33 @@ public class RSchemaRegion implements ISchemaRegion {
       byte[] suffixToMatch =
           RSchemaUtils.getSuffixOfLevelPath(
               ArrayUtils.subarray(nodes, firstNonWildcardIndex, nextFirstWildcardIndex), level);
-
-      scanKeys
-          .parallelStream()
-          .forEach(
-              prefixNodes -> {
-                String levelPrefix =
-                    RSchemaUtils.getLevelPathPrefix(prefixNodes, prefixNodes.length - 1, level);
-                Arrays.stream(nodeType)
-                    .parallel()
-                    .forEach(
-                        x -> {
-                          byte[] startKey = RSchemaUtils.toRocksDBKey(levelPrefix, x);
-                          RocksIterator iterator = readWriteHandler.iterator(null);
-                          iterator.seek(startKey);
-                          while (iterator.isValid()) {
-                            if (!RSchemaUtils.prefixMatch(iterator.key(), startKey)) {
-                              break;
-                            }
-                            if (RSchemaUtils.suffixMatch(iterator.key(), suffixToMatch)) {
-                              if (lastIteration) {
-                                function.apply(iterator.key(), iterator.value());
-                              } else {
-                                tempNodes.add(RSchemaUtils.toMetaNodes(iterator.key()));
-                              }
-                            }
-                            iterator.next();
+      Stream<String[]> scanKeysStream = scanKeys.parallelStream();
+      scanKeysStream.forEach(
+          prefixNodes -> {
+            String levelPrefix =
+                RSchemaUtils.getLevelPathPrefix(prefixNodes, prefixNodes.length - 1, level);
+            Arrays.stream(nodeType)
+                .parallel()
+                .forEach(
+                    x -> {
+                      byte[] startKey = RSchemaUtils.toRocksDBKey(levelPrefix, x);
+                      RocksIterator iterator = readWriteHandler.iterator(null);
+                      iterator.seek(startKey);
+                      while (iterator.isValid()) {
+                        if (!RSchemaUtils.prefixMatch(iterator.key(), startKey)) {
+                          break;
+                        }
+                        if (RSchemaUtils.suffixMatch(iterator.key(), suffixToMatch)) {
+                          if (lastIteration) {
+                            function.apply(iterator.key(), iterator.value());
+                          } else {
+                            tempNodes.add(RSchemaUtils.toMetaNodes(iterator.key()));
                           }
-                        });
-              });
+                        }
+                        iterator.next();
+                      }
+                    });
+          });
       scanKeys.clear();
       scanKeys.addAll(tempNodes);
       tempNodes.clear();
@@ -1204,8 +1202,13 @@ public class RSchemaRegion implements ISchemaRegion {
   }
 
   @Override
-  public List<MeasurementPath> getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMath)
-      throws MetadataException {
+  public List<MeasurementPath> getMeasurementPaths(
+      PartialPath pathPattern, boolean isPrefixMath, boolean withTags) throws MetadataException {
+    if (withTags) {
+      Map<MeasurementPath, Pair<Map<String, String>, Map<String, String>>> results =
+          getMatchedMeasurementPathWithTags(pathPattern.getNodes());
+      return new ArrayList<>(results.keySet());
+    }
     List<MeasurementPath> allResult = Collections.synchronizedList(new ArrayList<>());
     BiFunction<byte[], byte[], Boolean> function =
         (a, b) -> {
@@ -1223,15 +1226,16 @@ public class RSchemaRegion implements ISchemaRegion {
 
   @Override
   public Pair<List<MeasurementPath>, Integer> getMeasurementPathsWithAlias(
-      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
+      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch, boolean withTags)
       throws MetadataException {
     // todo page query
-    return new Pair<>(getMeasurementPaths(pathPattern, false), offset + limit);
+    return new Pair<>(getMeasurementPaths(pathPattern, false, withTags), offset + limit);
   }
 
   @Override
   public List<MeasurementPath> fetchSchema(
-      PartialPath pathPattern, Map<Integer, Template> templateMap) throws MetadataException {
+      PartialPath pathPattern, Map<Integer, Template> templateMap, boolean withTags)
+      throws MetadataException {
     throw new UnsupportedOperationException();
   }
 
@@ -1277,6 +1281,7 @@ public class RSchemaRegion implements ISchemaRegion {
     return new Pair<>(res, 1);
   }
 
+  @SuppressWarnings("unchecked")
   private Map<MeasurementPath, Pair<Map<String, String>, Map<String, String>>>
       getMatchedMeasurementPathWithTags(String[] nodes) throws IllegalPathException {
     Map<MeasurementPath, Pair<Map<String, String>, Map<String, String>>> allResult =
@@ -1295,10 +1300,12 @@ public class RSchemaRegion implements ISchemaRegion {
           if (!(attributes instanceof Map)) {
             attributes = Collections.emptyMap();
           }
-          @SuppressWarnings("unchecked")
+          Map<String, String> tagMap = (Map<String, String>) tag;
+          Map<String, String> attributesMap = (Map<String, String>) attributes;
           Pair<Map<String, String>, Map<String, String>> tagsAndAttributes =
-              new Pair<>((Map<String, String>) tag, (Map<String, String>) attributes);
+              new Pair<>(tagMap, attributesMap);
           allResult.put(measurementPath, tagsAndAttributes);
+          measurementPath.setTagMap(tagMap);
           return true;
         };
     traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_MEASUREMENT});
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index aebef242f7..7e8f64c4a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -798,7 +798,7 @@ public class LocalSchemaProcessor {
    */
   public List<MeasurementPath> getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
-    return getMeasurementPathsWithAlias(pathPattern, 0, 0, isPrefixMatch).left;
+    return getMeasurementPathsWithAlias(pathPattern, 0, 0, isPrefixMatch, false).left;
   }
 
   /**
@@ -821,7 +821,7 @@ public class LocalSchemaProcessor {
    * @param isPrefixMatch if true, the path pattern is used to match prefix path
    */
   public Pair<List<MeasurementPath>, Integer> getMeasurementPathsWithAlias(
-      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
+      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch, boolean withTags)
       throws MetadataException {
     List<MeasurementPath> measurementPaths = new LinkedList<>();
     Pair<List<MeasurementPath>, Integer> result;
@@ -836,7 +836,7 @@ public class LocalSchemaProcessor {
       }
       result =
           schemaRegion.getMeasurementPathsWithAlias(
-              pathPattern, tmpLimit, tmpOffset, isPrefixMatch);
+              pathPattern, tmpLimit, tmpOffset, isPrefixMatch, withTags);
       measurementPaths.addAll(result.left);
       resultOffset += result.right;
       if (limit != 0) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index 551ad101b3..e56b5b8612 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -109,6 +109,7 @@ public class DataNodeSchemaCache {
             devicePath.concatNode(
                 schemaCacheEntry.getSchemaEntryId()), // the cached path may be alias path
             schemaCacheEntry.getMeasurementSchema(),
+            schemaCacheEntry.getTagMap(),
             null,
             schemaCacheEntry.isAligned());
       }
@@ -121,6 +122,7 @@ public class DataNodeSchemaCache {
       SchemaCacheEntry schemaCacheEntry =
           new SchemaCacheEntry(
               (MeasurementSchema) measurementPath.getMeasurementSchema(),
+              measurementPath.getTagMap(),
               measurementPath.isUnderAlignedEntity());
       cache.put(new PartialPath(measurementPath.getNodes()), schemaCacheEntry);
     }
@@ -168,6 +170,7 @@ public class DataNodeSchemaCache {
           entry =
               new SchemaCacheEntry(
                   (MeasurementSchema) measurementPath.getMeasurementSchema(),
+                  measurementPath.getTagMap(),
                   measurementPath.isUnderAlignedEntity());
           cache.put(seriesPath, entry);
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
index 03ddf17360..9b0fa929d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
@@ -24,17 +24,22 @@ import org.apache.iotdb.db.metadata.lastCache.container.LastCacheContainer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import java.util.Map;
+
 public class SchemaCacheEntry {
 
   private final MeasurementSchema measurementSchema;
 
+  private final Map<String, String> tagMap;
   private final boolean isAligned;
 
   private volatile ILastCacheContainer lastCacheContainer = null;
 
-  SchemaCacheEntry(MeasurementSchema measurementSchema, boolean isAligned) {
+  SchemaCacheEntry(
+      MeasurementSchema measurementSchema, Map<String, String> tagMap, boolean isAligned) {
     this.measurementSchema = measurementSchema;
     this.isAligned = isAligned;
+    this.tagMap = tagMap;
   }
 
   public String getSchemaEntryId() {
@@ -45,6 +50,10 @@ public class SchemaCacheEntry {
     return measurementSchema;
   }
 
+  public Map<String, String> getTagMap() {
+    return tagMap;
+  }
+
   public TSDataType getTsDataType() {
     return measurementSchema.getType();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java
index 25eead8800..bc57c0bd21 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java
@@ -136,12 +136,15 @@ public interface IMTreeBelowSG {
    * collected and return.
    *
    * @param pathPattern a path pattern or a full path, may contain wildcard
+   * @param limit the limit of query result.
+   * @param offset the offset.
    * @param isPrefixMatch if true, the path pattern is used to match prefix path
+   * @param withTags whether returns all the tags of each timeseries as well.
    * @return Pair.left contains all the satisfied path Pair.right means the current offset or zero
    *     if we don't set offset.
    */
   Pair<List<MeasurementPath>, Integer> getMeasurementPathsWithAlias(
-      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
+      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch, boolean withTags)
       throws MetadataException;
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index b37288aff6..7fd682dd75 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -77,6 +77,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.function.Function;
 import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.toList;
@@ -114,11 +115,16 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
 
   private CachedMTreeStore store;
   private volatile IStorageGroupMNode storageGroupMNode;
+  private final Function<IMeasurementMNode, Map<String, String>> tagGetter;
   private int levelOfSG;
 
   // region MTree initialization, clear and serialization
-  public MTreeBelowSGCachedImpl(IStorageGroupMNode storageGroupMNode, int schemaRegionId)
+  public MTreeBelowSGCachedImpl(
+      IStorageGroupMNode storageGroupMNode,
+      Function<IMeasurementMNode, Map<String, String>> tagGetter,
+      int schemaRegionId)
       throws MetadataException, IOException {
+    this.tagGetter = tagGetter;
     PartialPath storageGroup = storageGroupMNode.getPartialPath();
     store = new CachedMTreeStore(storageGroup, schemaRegionId);
     this.storageGroupMNode = store.getRoot().getAsStorageGroupMNode();
@@ -631,7 +637,7 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
   @Override
   public List<MeasurementPath> getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
-    return getMeasurementPathsWithAlias(pathPattern, 0, 0, isPrefixMatch).left;
+    return getMeasurementPathsWithAlias(pathPattern, 0, 0, isPrefixMatch, false).left;
   }
 
   /**
@@ -657,7 +663,7 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
    */
   @Override
   public Pair<List<MeasurementPath>, Integer> getMeasurementPathsWithAlias(
-      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
+      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch, boolean withTags)
       throws MetadataException {
     List<MeasurementPath> result = new LinkedList<>();
     MeasurementCollector<List<PartialPath>> collector =
@@ -670,6 +676,9 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
               // only when user query with alias, the alias in path will be set
               path.setMeasurementAlias(node.getAlias());
             }
+            if (withTags) {
+              path.setTagMap(tagGetter.apply(node));
+            }
             result.add(path);
           }
         };
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index 3a05d885c4..e338665e45 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -83,6 +83,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.toList;
@@ -123,23 +124,32 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
   // this implementation is based on memory, thus only MTree write operation must invoke MTreeStore
   private MemMTreeStore store;
   private volatile IStorageGroupMNode storageGroupMNode;
+  private final Function<IMeasurementMNode, Map<String, String>> tagGetter;
   private int levelOfSG;
 
   // region MTree initialization, clear and serialization
-  public MTreeBelowSGMemoryImpl(IStorageGroupMNode storageGroupMNode, int schemaRegionId) {
+  public MTreeBelowSGMemoryImpl(
+      IStorageGroupMNode storageGroupMNode,
+      Function<IMeasurementMNode, Map<String, String>> tagGetter,
+      int schemaRegionId) {
     PartialPath storageGroup = storageGroupMNode.getPartialPath();
     store = new MemMTreeStore(storageGroup, true);
     this.storageGroupMNode = store.getRoot().getAsStorageGroupMNode();
     this.storageGroupMNode.setParent(storageGroupMNode.getParent());
     levelOfSG = storageGroup.getNodeLength() - 1;
+    this.tagGetter = tagGetter;
   }
 
   private MTreeBelowSGMemoryImpl(
-      MemMTreeStore store, IStorageGroupMNode storageGroupMNode, int schemaRegionId) {
+      MemMTreeStore store,
+      IStorageGroupMNode storageGroupMNode,
+      Function<IMeasurementMNode, Map<String, String>> tagGetter,
+      int schemaRegionId) {
     this.store = store;
     this.storageGroupMNode = store.getRoot().getAsStorageGroupMNode();
     this.storageGroupMNode.setParent(storageGroupMNode.getParent());
     levelOfSG = storageGroupMNode.getPartialPath().getNodeLength() - 1;
+    this.tagGetter = tagGetter;
   }
 
   @Override
@@ -156,11 +166,13 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
       File snapshotDir,
       IStorageGroupMNode storageGroupMNode,
       int schemaRegionId,
-      Consumer<IMeasurementMNode> measurementProcess)
+      Consumer<IMeasurementMNode> measurementProcess,
+      Function<IMeasurementMNode, Map<String, String>> tagGetter)
       throws IOException {
     return new MTreeBelowSGMemoryImpl(
         MemMTreeStore.loadFromSnapshot(snapshotDir, measurementProcess),
         storageGroupMNode,
+        tagGetter,
         schemaRegionId);
   }
 
@@ -714,7 +726,7 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
   @Override
   public List<MeasurementPath> getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
-    return getMeasurementPathsWithAlias(pathPattern, 0, 0, isPrefixMatch).left;
+    return getMeasurementPathsWithAlias(pathPattern, 0, 0, isPrefixMatch, false).left;
   }
 
   /**
@@ -740,7 +752,7 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
    */
   @Override
   public Pair<List<MeasurementPath>, Integer> getMeasurementPathsWithAlias(
-      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
+      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch, boolean withTags)
       throws MetadataException {
     List<MeasurementPath> result = new LinkedList<>();
     MeasurementCollector<List<PartialPath>> collector =
@@ -753,6 +765,9 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
               // only when user query with alias, the alias in path will be set
               path.setMeasurementAlias(node.getAlias());
             }
+            if (withTags) {
+              path.setTagMap(tagGetter.apply(node));
+            }
             result.add(path);
           }
         };
@@ -763,7 +778,8 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
   }
 
   public List<MeasurementPath> fetchSchema(
-      PartialPath pathPattern, Map<Integer, Template> templateMap) throws MetadataException {
+      PartialPath pathPattern, Map<Integer, Template> templateMap, boolean withTags)
+      throws MetadataException {
     List<MeasurementPath> result = new LinkedList<>();
     MeasurementCollector<List<PartialPath>> collector =
         new MeasurementCollector<List<PartialPath>>(storageGroupMNode, pathPattern, store) {
@@ -777,6 +793,9 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
               // only when user query with alias, the alias in path will be set
               path.setMeasurementAlias(node.getAlias());
             }
+            if (withTags) {
+              path.setTagMap(tagGetter.apply(node));
+            }
             result.add(path);
           }
         };
@@ -1386,6 +1405,7 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
    * @return index on full path of the node which matches all measurements' path with its
    *     upperTemplate.
    */
+  @Override
   public int getMountedNodeIndexOnMeasurementPath(PartialPath devicePath, String[] measurements)
       throws MetadataException {
     String[] nodes = devicePath.getNodes();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index 64ced4db0e..77d1b5e81f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -290,9 +290,10 @@ public interface ISchemaRegion {
    *
    * @param pathPattern can be a pattern or a full path of timeseries.
    * @param isPrefixMatch if true, the path pattern is used to match prefix path
+   * @param withTags whether returns tag kvs in the result list.
    */
-  List<MeasurementPath> getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMatch)
-      throws MetadataException;
+  List<MeasurementPath> getMeasurementPaths(
+      PartialPath pathPattern, boolean isPrefixMatch, boolean withTags) throws MetadataException;
 
   /**
    * Similar to method getMeasurementPaths(), but return Path with alias and filter the result by
@@ -302,10 +303,11 @@ public interface ISchemaRegion {
    * @param isPrefixMatch if true, the path pattern is used to match prefix path
    */
   Pair<List<MeasurementPath>, Integer> getMeasurementPathsWithAlias(
-      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
+      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch, boolean withTags)
       throws MetadataException;
 
-  List<MeasurementPath> fetchSchema(PartialPath pathPattern, Map<Integer, Template> templateMap)
+  List<MeasurementPath> fetchSchema(
+      PartialPath pathPattern, Map<Integer, Template> templateMap, boolean withTags)
       throws MetadataException;
 
   Pair<List<ShowTimeSeriesResult>, Integer> showTimeseries(
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 4b02c508fc..ed35f0e642 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -228,6 +228,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     init();
   }
 
+  @Override
   @SuppressWarnings("squid:S2093")
   public synchronized void init() throws MetadataException {
     if (initialized) {
@@ -243,7 +244,9 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       isRecovering = true;
 
       tagManager = new TagManager(schemaRegionDirPath);
-      mtree = new MTreeBelowSGMemoryImpl(storageGroupMNode, schemaRegionId.getId());
+      mtree =
+          new MTreeBelowSGMemoryImpl(
+              storageGroupMNode, tagManager::readTags, schemaRegionId.getId());
 
       if (!(config.isClusterMode()
           && config
@@ -304,6 +307,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     }
   }
 
+  @Override
   public void forceMlog() {
     if (!initialized) {
       return;
@@ -462,14 +466,17 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
 
   // region Interfaces for schema region Info query and operation
 
+  @Override
   public String getStorageGroupFullPath() {
     return storageGroupFullPath;
   }
 
+  @Override
   public SchemaRegionId getSchemaRegionId() {
     return schemaRegionId;
   }
 
+  @Override
   public synchronized void deleteSchemaRegion() throws MetadataException {
     // collect all the LeafMNode in this schema region
     List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
@@ -556,7 +563,8 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
                       storageGroupFullPath + PATH_SEPARATOR + measurementMNode.getFullPath(),
                       schemaRegionId);
                 }
-              });
+              },
+              tagManager::readTags);
       logger.info(
           "MTree snapshot loading of schemaRegion {} costs {}ms.",
           schemaRegionId,
@@ -598,6 +606,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     createTimeseries(plan, -1);
   }
 
+  @Override
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
     if (!memoryStatistics.isAllowToCreateNewSeries()) {
@@ -678,6 +687,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    *
    * @param plan CreateAlignedTimeSeriesPlan
    */
+  @Override
   public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
     int seriesCount = plan.getMeasurements().size();
     if (!memoryStatistics.isAllowToCreateNewSeries()) {
@@ -793,6 +803,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * @param isPrefixMatch if true, the path pattern is used to match prefix path
    * @return deletion failed Timeseries
    */
+  @Override
   public synchronized Pair<Integer, Set<String>> deleteTimeseries(
       PartialPath pathPattern, boolean isPrefixMatch) throws MetadataException {
     try {
@@ -984,6 +995,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     return node;
   }
 
+  @Override
   public void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws MetadataException {
     mtree.getDeviceNodeWithAutoCreating(plan.getPath());
     try {
@@ -1000,6 +1012,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    *
    * @param path a full path or a prefix path
    */
+  @Override
   public boolean isPathExist(PartialPath path) {
     try {
       return mtree.isPathExist(path);
@@ -1016,6 +1029,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * path, may contain wildcard. If using prefix match, the path pattern is used to match prefix
    * path. All timeseries start with the matched prefix path will be counted.
    */
+  @Override
   public int getAllTimeseriesCount(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
     return mtree.getAllTimeseriesCount(pathPattern, isPrefixMatch);
@@ -1044,6 +1058,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * pattern is used to match prefix path. All timeseries start with the matched prefix path will be
    * counted.
    */
+  @Override
   public int getDevicesNum(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
     return mtree.getDevicesNum(pathPattern, isPrefixMatch);
@@ -1063,11 +1078,13 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * @param level the level should match the level of the path
    * @param isPrefixMatch if true, the path pattern is used to match prefix path
    */
+  @Override
   public int getNodesCountInGivenLevel(PartialPath pathPattern, int level, boolean isPrefixMatch)
       throws MetadataException {
     return mtree.getNodesCountInGivenLevel(pathPattern, level, isPrefixMatch);
   }
 
+  @Override
   public Map<PartialPath, Integer> getMeasurementCountGroupByLevel(
       PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException {
     return mtree.getMeasurementCountGroupByLevel(pathPattern, level, isPrefixMatch);
@@ -1093,6 +1110,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   // endregion
 
   // region Interfaces for level Node info Query
+  @Override
   public List<PartialPath> getNodesListInGivenLevel(
       PartialPath pathPattern,
       int nodeLevel,
@@ -1113,6 +1131,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * @param pathPattern The given path
    * @return All child nodes' seriesPath(s) of given seriesPath.
    */
+  @Override
   public Set<TSchemaNode> getChildNodePathInNextLevel(PartialPath pathPattern)
       throws MetadataException {
     return mtree.getChildNodePathInNextLevel(pathPattern);
@@ -1128,6 +1147,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    *
    * @return All child nodes of given seriesPath.
    */
+  @Override
   public Set<String> getChildNodeNameInNextLevel(PartialPath pathPattern) throws MetadataException {
     return mtree.getChildNodeNameInNextLevel(pathPattern);
   }
@@ -1142,6 +1162,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * @param timeseries a path pattern of the target timeseries
    * @return A HashSet instance which stores devices paths.
    */
+  @Override
   public Set<PartialPath> getBelongedDevices(PartialPath timeseries) throws MetadataException {
     return mtree.getDevicesByTimeseries(timeseries);
   }
@@ -1154,6 +1175,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * @param isPrefixMatch if true, the path pattern is used to match prefix path.
    * @return A HashSet instance which stores devices paths matching the given path pattern.
    */
+  @Override
   public Set<PartialPath> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
     return mtree.getDevices(pathPattern, isPrefixMatch);
@@ -1165,6 +1187,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * @param plan ShowDevicesPlan which contains the path pattern and restriction params.
    * @return ShowDevicesResult and the current offset of this region after traverse.
    */
+  @Override
   public Pair<List<ShowDevicesResult>, Integer> getMatchedDevices(ShowDevicesPlan plan)
       throws MetadataException {
     return mtree.getDevices(plan);
@@ -1182,9 +1205,10 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * @param pathPattern can be a pattern or a full path of timeseries.
    * @param isPrefixMatch if true, the path pattern is used to match prefix path
    */
-  public List<MeasurementPath> getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMatch)
-      throws MetadataException {
-    return getMeasurementPathsWithAlias(pathPattern, 0, 0, isPrefixMatch).left;
+  @Override
+  public List<MeasurementPath> getMeasurementPaths(
+      PartialPath pathPattern, boolean isPrefixMatch, boolean withTags) throws MetadataException {
+    return getMeasurementPathsWithAlias(pathPattern, 0, 0, isPrefixMatch, withTags).left;
   }
 
   /**
@@ -1196,7 +1220,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    */
   public List<MeasurementPath> getMeasurementPaths(PartialPath pathPattern)
       throws MetadataException {
-    return getMeasurementPaths(pathPattern, false);
+    return getMeasurementPaths(pathPattern, false, false);
   }
 
   /**
@@ -1206,18 +1230,21 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    *
    * @param isPrefixMatch if true, the path pattern is used to match prefix path
    */
+  @Override
   public Pair<List<MeasurementPath>, Integer> getMeasurementPathsWithAlias(
-      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
+      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch, boolean withTags)
       throws MetadataException {
-    return mtree.getMeasurementPathsWithAlias(pathPattern, limit, offset, isPrefixMatch);
+    return mtree.getMeasurementPathsWithAlias(pathPattern, limit, offset, isPrefixMatch, withTags);
   }
 
   @Override
   public List<MeasurementPath> fetchSchema(
-      PartialPath pathPattern, Map<Integer, Template> templateMap) throws MetadataException {
-    return mtree.fetchSchema(pathPattern, templateMap);
+      PartialPath pathPattern, Map<Integer, Template> templateMap, boolean withTags)
+      throws MetadataException {
+    return mtree.fetchSchema(pathPattern, templateMap, withTags);
   }
 
+  @Override
   public Pair<List<ShowTimeSeriesResult>, Integer> showTimeseries(
       ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
     // show timeseries with index
@@ -1362,6 +1389,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   }
 
   // attention: this path must be a device node
+  @Override
   public List<MeasurementPath> getAllMeasurementByDevicePath(PartialPath devicePath)
       throws PathNotExistException {
     List<MeasurementPath> res = new LinkedList<>();
@@ -1399,6 +1427,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   // endregion
 
   // region Interfaces and methods for MNode query
+  @Override
   public IMNode getDeviceNode(PartialPath path) throws MetadataException {
     IMNode node;
     try {
@@ -1412,6 +1441,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     }
   }
 
+  @Override
   public IMeasurementMNode getMeasurementMNode(PartialPath fullPath) throws MetadataException {
     return mtree.getMeasurementMNode(fullPath);
   }
@@ -1458,6 +1488,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     }
   }
 
+  @Override
   public void changeAlias(PartialPath path, String alias) throws MetadataException {
     IMeasurementMNode leafMNode = mtree.getMeasurementMNode(path);
     if (leafMNode.getAlias() != null) {
@@ -1482,6 +1513,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * @param attributesMap newly added attributes map
    * @param fullPath timeseries
    */
+  @Override
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void upsertTagsAndAttributes(
       String alias,
@@ -1537,6 +1569,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * @param attributesMap newly added attributes map
    * @param fullPath timeseries
    */
+  @Override
   public void addAttributes(Map<String, String> attributesMap, PartialPath fullPath)
       throws MetadataException, IOException {
     IMeasurementMNode leafMNode = mtree.getMeasurementMNode(fullPath);
@@ -1558,6 +1591,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * @param tagsMap newly added tags map
    * @param fullPath timeseries
    */
+  @Override
   public void addTags(Map<String, String> tagsMap, PartialPath fullPath)
       throws MetadataException, IOException {
     IMeasurementMNode leafMNode = mtree.getMeasurementMNode(fullPath);
@@ -1580,6 +1614,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * @param keySet tags key or attributes key
    * @param fullPath timeseries path
    */
+  @Override
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void dropTagsOrAttributes(Set<String> keySet, PartialPath fullPath)
       throws MetadataException, IOException {
@@ -1596,6 +1631,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * @param alterMap the new tags or attributes key-value
    * @param fullPath timeseries
    */
+  @Override
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void setTagsOrAttributesValue(Map<String, String> alterMap, PartialPath fullPath)
       throws MetadataException, IOException {
@@ -1616,6 +1652,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * @param newKey new key of tag or attribute
    * @param fullPath timeseries
    */
+  @Override
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void renameTagOrAttributeKey(String oldKey, String newKey, PartialPath fullPath)
       throws MetadataException, IOException {
@@ -1638,6 +1675,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
 
   // region Interfaces and Implementation for InsertPlan process
   /** get schema for device. Attention!!! Only support insertPlan */
+  @Override
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public IMNode getSeriesSchemasAndReadLockDevice(InsertPlan plan)
       throws MetadataException, IOException {
@@ -1949,19 +1987,23 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * @param templateName designated template name, blank string for any template exists
    * @return paths set
    */
+  @Override
   public Set<String> getPathsSetTemplate(String templateName) throws MetadataException {
     return new HashSet<>(mtree.getPathsSetOnTemplate(templateName));
   }
 
+  @Override
   public Set<String> getPathsUsingTemplate(String templateName) throws MetadataException {
     return new HashSet<>(mtree.getPathsUsingTemplate(templateName));
   }
 
+  @Override
   public boolean isTemplateAppendable(Template template, List<String> measurements)
       throws MetadataException {
     return mtree.isTemplateAppendable(template, measurements);
   }
 
+  @Override
   public synchronized void setSchemaTemplate(SetTemplatePlan plan) throws MetadataException {
     // get mnode and update template should be atomic
     Template template = TemplateManager.getInstance().getTemplate(plan.getTemplateName());
@@ -1987,6 +2029,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     }
   }
 
+  @Override
   public synchronized void unsetSchemaTemplate(UnsetTemplatePlan plan) throws MetadataException {
     // get mnode should be atomic
     try {
@@ -2011,6 +2054,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     }
   }
 
+  @Override
   public void setUsingSchemaTemplate(ActivateTemplatePlan plan) throws MetadataException {
     // check whether any template has been set on designated path
     if (mtree.getTemplateOnPath(plan.getPrefixPath()) == null) {
@@ -2108,10 +2152,12 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
 
   // region Interfaces for Trigger
 
+  @Override
   public IMNode getMNodeForTrigger(PartialPath fullPath) throws MetadataException {
     return mtree.getNodeByPath(fullPath);
   }
 
+  @Override
   public void releaseMNodeAfterDropTrigger(IMNode node) throws MetadataException {
     // do nothing
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 7f7efe56b7..fbe6523ac3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -211,6 +211,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     init();
   }
 
+  @Override
   @SuppressWarnings("squid:S2093")
   public synchronized void init() throws MetadataException {
     if (initialized) {
@@ -256,7 +257,9 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
       isRecovering = true;
 
       tagManager = new TagManager(schemaRegionDirPath);
-      mtree = new MTreeBelowSGCachedImpl(storageGroupMNode, schemaRegionId.getId());
+      mtree =
+          new MTreeBelowSGCachedImpl(
+              storageGroupMNode, tagManager::readTags, schemaRegionId.getId());
 
       int lineNumber = initFromLog(logFile);
 
@@ -272,6 +275,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     initialized = true;
   }
 
+  @Override
   public void forceMlog() {
     if (!initialized) {
       return;
@@ -409,14 +413,17 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
 
   // region Interfaces for schema region Info query and operation
 
+  @Override
   public String getStorageGroupFullPath() {
     return storageGroupFullPath;
   }
 
+  @Override
   public SchemaRegionId getSchemaRegionId() {
     return schemaRegionId;
   }
 
+  @Override
   public synchronized void deleteSchemaRegion() throws MetadataException {
     // collect all the LeafMNode in this schema region
     List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
@@ -477,6 +484,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     }
   }
 
+  @Override
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
     if (!memoryStatistics.isAllowToCreateNewSeries()) {
@@ -623,6 +631,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    *
    * @param plan CreateAlignedTimeSeriesPlan
    */
+  @Override
   public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
     int seriesCount = plan.getMeasurements().size();
     if (!memoryStatistics.isAllowToCreateNewSeries()) {
@@ -744,6 +753,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * @param isPrefixMatch if true, the path pattern is used to match prefix path
    * @return deletion failed Timeseries
    */
+  @Override
   public synchronized Pair<Integer, Set<String>> deleteTimeseries(
       PartialPath pathPattern, boolean isPrefixMatch) throws MetadataException {
     try {
@@ -878,6 +888,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     return node;
   }
 
+  @Override
   public void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws MetadataException {
     IMNode node = mtree.getDeviceNodeWithAutoCreating(plan.getPath());
     mtree.unPinMNode(node);
@@ -897,6 +908,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    *
    * @param path a full path or a prefix path
    */
+  @Override
   public boolean isPathExist(PartialPath path) {
     try {
       return mtree.isPathExist(path);
@@ -913,6 +925,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * path, may contain wildcard. If using prefix match, the path pattern is used to match prefix
    * path. All timeseries start with the matched prefix path will be counted.
    */
+  @Override
   public int getAllTimeseriesCount(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
     return mtree.getAllTimeseriesCount(pathPattern, isPrefixMatch);
@@ -941,6 +954,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * pattern is used to match prefix path. All timeseries start with the matched prefix path will be
    * counted.
    */
+  @Override
   public int getDevicesNum(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
     return mtree.getDevicesNum(pathPattern, isPrefixMatch);
@@ -960,11 +974,13 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * @param level the level should match the level of the path
    * @param isPrefixMatch if true, the path pattern is used to match prefix path
    */
+  @Override
   public int getNodesCountInGivenLevel(PartialPath pathPattern, int level, boolean isPrefixMatch)
       throws MetadataException {
     return mtree.getNodesCountInGivenLevel(pathPattern, level, isPrefixMatch);
   }
 
+  @Override
   public Map<PartialPath, Integer> getMeasurementCountGroupByLevel(
       PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException {
     return mtree.getMeasurementCountGroupByLevel(pathPattern, level, isPrefixMatch);
@@ -990,6 +1006,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
   // endregion
 
   // region Interfaces for level Node info Query
+  @Override
   public List<PartialPath> getNodesListInGivenLevel(
       PartialPath pathPattern,
       int nodeLevel,
@@ -1010,6 +1027,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * @param pathPattern The given path
    * @return All child nodes' seriesPath(s) of given seriesPath.
    */
+  @Override
   public Set<TSchemaNode> getChildNodePathInNextLevel(PartialPath pathPattern)
       throws MetadataException {
     return mtree.getChildNodePathInNextLevel(pathPattern);
@@ -1025,6 +1043,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    *
    * @return All child nodes of given seriesPath.
    */
+  @Override
   public Set<String> getChildNodeNameInNextLevel(PartialPath pathPattern) throws MetadataException {
     return mtree.getChildNodeNameInNextLevel(pathPattern);
   }
@@ -1039,6 +1058,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * @param timeseries a path pattern of the target timeseries
    * @return A HashSet instance which stores devices paths.
    */
+  @Override
   public Set<PartialPath> getBelongedDevices(PartialPath timeseries) throws MetadataException {
     return mtree.getDevicesByTimeseries(timeseries);
   }
@@ -1051,6 +1071,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * @param isPrefixMatch if true, the path pattern is used to match prefix path.
    * @return A HashSet instance which stores devices paths matching the given path pattern.
    */
+  @Override
   public Set<PartialPath> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
     return mtree.getDevices(pathPattern, isPrefixMatch);
@@ -1062,6 +1083,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * @param plan ShowDevicesPlan which contains the path pattern and restriction params.
    * @return ShowDevicesResult and the current offset of this region after traverse.
    */
+  @Override
   public Pair<List<ShowDevicesResult>, Integer> getMatchedDevices(ShowDevicesPlan plan)
       throws MetadataException {
     return mtree.getDevices(plan);
@@ -1079,9 +1101,10 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * @param pathPattern can be a pattern or a full path of timeseries.
    * @param isPrefixMatch if true, the path pattern is used to match prefix path
    */
-  public List<MeasurementPath> getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMatch)
-      throws MetadataException {
-    return getMeasurementPathsWithAlias(pathPattern, 0, 0, isPrefixMatch).left;
+  @Override
+  public List<MeasurementPath> getMeasurementPaths(
+      PartialPath pathPattern, boolean isPrefixMatch, boolean withTags) throws MetadataException {
+    return getMeasurementPathsWithAlias(pathPattern, 0, 0, isPrefixMatch, withTags).left;
   }
 
   /**
@@ -1091,18 +1114,21 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    *
    * @param isPrefixMatch if true, the path pattern is used to match prefix path
    */
+  @Override
   public Pair<List<MeasurementPath>, Integer> getMeasurementPathsWithAlias(
-      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
+      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch, boolean withTags)
       throws MetadataException {
-    return mtree.getMeasurementPathsWithAlias(pathPattern, limit, offset, isPrefixMatch);
+    return mtree.getMeasurementPathsWithAlias(pathPattern, limit, offset, isPrefixMatch, withTags);
   }
 
   @Override
   public List<MeasurementPath> fetchSchema(
-      PartialPath pathPattern, Map<Integer, Template> templateMap) throws MetadataException {
+      PartialPath pathPattern, Map<Integer, Template> templateMap, boolean withTags)
+      throws MetadataException {
     throw new UnsupportedOperationException();
   }
 
+  @Override
   public Pair<List<ShowTimeSeriesResult>, Integer> showTimeseries(
       ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
     // show timeseries with index
@@ -1247,6 +1273,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
   }
 
   // attention: this path must be a device node
+  @Override
   public List<MeasurementPath> getAllMeasurementByDevicePath(PartialPath devicePath)
       throws PathNotExistException {
     List<MeasurementPath> res = new LinkedList<>();
@@ -1285,6 +1312,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
 
   // region Interfaces and methods for MNode query
 
+  @Override
   public IMNode getDeviceNode(PartialPath path) throws MetadataException {
     IMNode node;
     try {
@@ -1298,6 +1326,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     }
   }
 
+  @Override
   public IMeasurementMNode getMeasurementMNode(PartialPath fullPath) throws MetadataException {
     IMeasurementMNode measurementMNode = mtree.getMeasurementMNode(fullPath);
     mtree.unPinMNode(measurementMNode);
@@ -1355,6 +1384,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     }
   }
 
+  @Override
   public void changeAlias(PartialPath path, String alias) throws MetadataException {
     IMeasurementMNode leafMNode = mtree.getMeasurementMNode(path);
     try {
@@ -1385,6 +1415,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * @param attributesMap newly added attributes map
    * @param fullPath timeseries
    */
+  @Override
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void upsertTagsAndAttributes(
       String alias,
@@ -1445,6 +1476,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * @param attributesMap newly added attributes map
    * @param fullPath timeseries
    */
+  @Override
   public void addAttributes(Map<String, String> attributesMap, PartialPath fullPath)
       throws MetadataException, IOException {
     IMeasurementMNode leafMNode = mtree.getMeasurementMNode(fullPath);
@@ -1470,6 +1502,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * @param tagsMap newly added tags map
    * @param fullPath timeseries
    */
+  @Override
   public void addTags(Map<String, String> tagsMap, PartialPath fullPath)
       throws MetadataException, IOException {
     IMeasurementMNode leafMNode = mtree.getMeasurementMNode(fullPath);
@@ -1498,6 +1531,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * @param keySet tags key or attributes key
    * @param fullPath timeseries path
    */
+  @Override
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void dropTagsOrAttributes(Set<String> keySet, PartialPath fullPath)
       throws MetadataException, IOException {
@@ -1520,6 +1554,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * @param alterMap the new tags or attributes key-value
    * @param fullPath timeseries
    */
+  @Override
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void setTagsOrAttributesValue(Map<String, String> alterMap, PartialPath fullPath)
       throws MetadataException, IOException {
@@ -1544,6 +1579,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * @param newKey new key of tag or attribute
    * @param fullPath timeseries
    */
+  @Override
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void renameTagOrAttributeKey(String oldKey, String newKey, PartialPath fullPath)
       throws MetadataException, IOException {
@@ -1570,6 +1606,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
 
   // region Interfaces and Implementation for InsertPlan process
   /** get schema for device. Attention!!! Only support insertPlan */
+  @Override
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public IMNode getSeriesSchemasAndReadLockDevice(InsertPlan plan)
       throws MetadataException, IOException {
@@ -1811,19 +1848,23 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * @param templateName designated template name, blank string for any template exists
    * @return paths set
    */
+  @Override
   public Set<String> getPathsSetTemplate(String templateName) throws MetadataException {
     return new HashSet<>(mtree.getPathsSetOnTemplate(templateName));
   }
 
+  @Override
   public Set<String> getPathsUsingTemplate(String templateName) throws MetadataException {
     return new HashSet<>(mtree.getPathsUsingTemplate(templateName));
   }
 
+  @Override
   public boolean isTemplateAppendable(Template template, List<String> measurements)
       throws MetadataException {
     return mtree.isTemplateAppendable(template, measurements);
   }
 
+  @Override
   public synchronized void setSchemaTemplate(SetTemplatePlan plan) throws MetadataException {
     // get mnode and update template should be atomic
     Template template = TemplateManager.getInstance().getTemplate(plan.getTemplateName());
@@ -1856,6 +1897,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     }
   }
 
+  @Override
   public synchronized void unsetSchemaTemplate(UnsetTemplatePlan plan) throws MetadataException {
     // get mnode should be atomic
     try {
@@ -1882,6 +1924,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     }
   }
 
+  @Override
   public void setUsingSchemaTemplate(ActivateTemplatePlan plan) throws MetadataException {
     // check whether any template has been set on designated path
     if (mtree.getTemplateOnPath(plan.getPrefixPath()) == null) {
@@ -1960,10 +2003,12 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
 
   // region Interfaces for Trigger
 
+  @Override
   public IMNode getMNodeForTrigger(PartialPath fullPath) throws MetadataException {
     return mtree.getNodeByPath(fullPath);
   }
 
+  @Override
   public void releaseMNodeAfterDropTrigger(IMNode node) throws MetadataException {
     mtree.unPinMNode(node);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
index 7c6eecd1d3..a8029aed74 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
@@ -655,6 +655,21 @@ public class TagManager {
     return tagLogFile.read(config.getTagAttributeTotalSize(), tagFileOffset);
   }
 
+  /**
+   * Read the tags of this node.
+   *
+   * @param node the node to query.
+   * @return the tag key-value map.
+   * @throws RuntimeException If any IOException happens.
+   */
+  public Map<String, String> readTags(IMeasurementMNode node) {
+    try {
+      return readTagFile(node.getOffset()).getLeft();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   public void clear() throws IOException {
     this.tagIndex.clear();
     if (tagLogFile != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
index 4182a1cb29..3675f85f81 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
@@ -41,6 +41,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
 import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_MATCH_PATTERN;
@@ -159,12 +160,17 @@ public class ClusterSchemaTree implements ISchemaTree {
     appendSingleMeasurement(
         measurementPath,
         (MeasurementSchema) measurementPath.getMeasurementSchema(),
+        measurementPath.getTagMap(),
         measurementPath.isMeasurementAliasExists() ? measurementPath.getMeasurementAlias() : null,
         measurementPath.isUnderAlignedEntity());
   }
 
   public void appendSingleMeasurement(
-      PartialPath path, MeasurementSchema schema, String alias, boolean isAligned) {
+      PartialPath path,
+      MeasurementSchema schema,
+      Map<String, String> tagMap,
+      String alias,
+      boolean isAligned) {
     String[] nodes = path.getNodes();
     SchemaNode cur = root;
     SchemaNode child;
@@ -177,6 +183,7 @@ public class ClusterSchemaTree implements ISchemaTree {
             measurementNode.setAlias(alias);
             cur.getAsEntityNode().addAliasChild(alias, measurementNode);
           }
+          measurementNode.setTagMap(tagMap);
           child = measurementNode;
         } else if (i == nodes.length - 2) {
           SchemaEntityNode entityNode = new SchemaEntityNode(nodes[i]);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaMeasurementNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaMeasurementNode.java
index 8a2bdb5c82..d7751ef317 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaMeasurementNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaMeasurementNode.java
@@ -25,12 +25,15 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
 
 public class SchemaMeasurementNode extends SchemaNode {
 
   private String alias;
   private MeasurementSchema schema;
 
+  private Map<String, String> tagMap;
+
   public SchemaMeasurementNode(String name, MeasurementSchema schema) {
     super(name);
     this.schema = schema;
@@ -48,6 +51,10 @@ public class SchemaMeasurementNode extends SchemaNode {
     return schema;
   }
 
+  public Map<String, String> getTagMap() {
+    return tagMap;
+  }
+
   @Override
   public void replaceChild(String name, SchemaNode newChild) {
     throw new UnsupportedOperationException(
@@ -68,6 +75,10 @@ public class SchemaMeasurementNode extends SchemaNode {
     this.schema = schema;
   }
 
+  public void setTagMap(Map<String, String> tagMap) {
+    this.tagMap = tagMap;
+  }
+
   @Override
   public boolean isMeasurement() {
     return true;
@@ -90,15 +101,18 @@ public class SchemaMeasurementNode extends SchemaNode {
 
     ReadWriteIOUtils.write(alias, outputStream);
     schema.serializeTo(outputStream);
+    ReadWriteIOUtils.write(tagMap, outputStream);
   }
 
   public static SchemaMeasurementNode deserialize(InputStream inputStream) throws IOException {
     String name = ReadWriteIOUtils.readString(inputStream);
     String alias = ReadWriteIOUtils.readString(inputStream);
     MeasurementSchema schema = MeasurementSchema.deserializeFrom(inputStream);
+    Map<String, String> tagMap = ReadWriteIOUtils.readMap(inputStream);
 
     SchemaMeasurementNode measurementNode = new SchemaMeasurementNode(name, schema);
     measurementNode.setAlias(alias);
+    measurementNode.setTagMap(tagMap);
     return measurementNode;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/visitor/SchemaTreeMeasurementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/visitor/SchemaTreeMeasurementVisitor.java
index 9f1e6c60ea..38b03580a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/visitor/SchemaTreeMeasurementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/visitor/SchemaTreeMeasurementVisitor.java
@@ -74,6 +74,7 @@ public class SchemaTreeMeasurementVisitor extends SchemaTreeVisitor<MeasurementP
         new MeasurementPath(
             generateFullPathNodes(nextMatchedNode),
             nextMatchedNode.getAsMeasurementNode().getSchema());
+    result.setTagMap(nextMatchedNode.getAsMeasurementNode().getTagMap());
     result.setUnderAlignedEntity(ancestorStack.peek().getNode().getAsEntityNode().isAligned());
     String alias = nextMatchedNode.getAsMeasurementNode().getAlias();
     if (nodes[nodes.length - 1].equals(alias)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
new file mode 100644
index 0000000000..4abbd1dcb3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class TagAggregationOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private final List<List<String>> groups;
+  private final List<List<Aggregator>> groupedAggregators;
+  private final List<Operator> children;
+  private final TsBlock[] inputTsBlocks;
+
+  // Indicate whether a child operator's next() is called
+  private final boolean[] hasCalledNext;
+
+  // These fields record the to be consumed index of each tsBlock.
+  private final int[] consumedIndices;
+  private final TsBlockBuilder tsBlockBuilder;
+  private final long maxRetainedSize;
+  private final long childrenRetainedSize;
+  private final long maxReturnSize;
+
+  public TagAggregationOperator(
+      OperatorContext operatorContext,
+      List<List<String>> groups,
+      List<List<Aggregator>> groupedAggregators,
+      List<Operator> children,
+      long maxReturnSize) {
+    this.operatorContext = Validate.notNull(operatorContext);
+    this.groups = Validate.notNull(groups);
+    this.groupedAggregators = Validate.notNull(groupedAggregators);
+    this.children = Validate.notNull(children);
+    List<TSDataType> actualOutputColumnTypes = new ArrayList<>();
+    for (String ignored : groups.get(0)) {
+      actualOutputColumnTypes.add(TSDataType.TEXT);
+    }
+    for (int outputColumnIdx = 0;
+        outputColumnIdx < groupedAggregators.get(0).size();
+        outputColumnIdx++) {
+      for (List<Aggregator> aggregators : groupedAggregators) {
+        Aggregator aggregator = aggregators.get(outputColumnIdx);
+        if (aggregator != null) {
+          actualOutputColumnTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+          break;
+        }
+      }
+    }
+    this.tsBlockBuilder = new TsBlockBuilder(actualOutputColumnTypes);
+    // Initialize input tsblocks for each aggregator group.
+    this.inputTsBlocks = new TsBlock[children.size()];
+    this.hasCalledNext = new boolean[children.size()];
+    this.consumedIndices = new int[children.size()];
+    this.maxRetainedSize = children.stream().mapToLong(Operator::calculateMaxReturnSize).sum();
+    this.childrenRetainedSize =
+        children.stream().mapToLong(Operator::calculateRetainedSizeAfterCallingNext).sum();
+    this.maxReturnSize = maxReturnSize;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() {
+    Arrays.fill(hasCalledNext, false);
+    long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+    long start = System.nanoTime();
+    boolean successful = true;
+    while (System.nanoTime() - start < maxRuntime && !tsBlockBuilder.isFull() && successful) {
+      successful = processOneRow();
+    }
+    TsBlock tsBlock = null;
+    if (tsBlockBuilder.getPositionCount() > 0) {
+      tsBlock = tsBlockBuilder.build();
+    }
+    tsBlockBuilder.reset();
+    return tsBlock;
+  }
+
+  private boolean processOneRow() {
+    for (int i = 0; i < children.size(); i++) {
+      // If the data is unavailable first, try to find next tsblock of the child.
+      if (dataUnavailable(i) && !hasCalledNext[i]) {
+        inputTsBlocks[i] = children.get(i).next();
+        consumedIndices[i] = 0;
+        hasCalledNext[i] = true;
+      }
+
+      // If it's still unavailable, then blocked by children i.
+      if (dataUnavailable(i)) {
+        return false;
+      }
+    }
+
+    TsBlock[] rowBlocks = new TsBlock[children.size()];
+    for (int i = 0; i < children.size(); i++) {
+      rowBlocks[i] = inputTsBlocks[i].getRegion(consumedIndices[i], 1);
+    }
+    for (int groupIdx = 0; groupIdx < groups.size(); groupIdx++) {
+      List<String> group = groups.get(groupIdx);
+      List<Aggregator> aggregators = groupedAggregators.get(groupIdx);
+
+      for (Aggregator aggregator : aggregators) {
+        if (aggregator == null) {
+          continue;
+        }
+        aggregator.reset();
+        aggregator.processTsBlocks(rowBlocks);
+      }
+
+      TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
+      timeColumnBuilder.writeLong(rowBlocks[0].getStartTime());
+      ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+
+      for (int i = 0; i < group.size(); i++) {
+        if (group.get(i) == null) {
+          columnBuilders[i].writeBinary(new Binary("NULL"));
+        } else {
+          columnBuilders[i].writeBinary(new Binary(group.get(i)));
+        }
+      }
+      for (int i = 0; i < aggregators.size(); i++) {
+        Aggregator aggregator = aggregators.get(i);
+        ColumnBuilder columnBuilder = columnBuilders[i + group.size()];
+        if (aggregator == null) {
+          columnBuilder.appendNull();
+        } else {
+          aggregator.outputResult(new ColumnBuilder[] {columnBuilder});
+        }
+      }
+      tsBlockBuilder.declarePosition();
+    }
+
+    // Reset dataReady for next iteration
+    for (int i = 0; i < children.size(); i++) {
+      consumedIndices[i]++;
+    }
+    return true;
+  }
+
+  @Override
+  public boolean hasNext() {
+    for (int i = 0; i < children.size(); i++) {
+      if (dataUnavailable(i) && !children.get(i).hasNext()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean isFinished() {
+    return !this.hasNext();
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+    for (int i = 0; i < children.size(); i++) {
+      if (dataUnavailable(i)) {
+        ListenableFuture<?> blocked = children.get(i).isBlocked();
+        if (!blocked.isDone()) {
+          listenableFutures.add(blocked);
+        }
+      }
+    }
+    return listenableFutures.isEmpty() ? NOT_BLOCKED : Futures.successfulAsList(listenableFutures);
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxReturnSize + maxRetainedSize + childrenRetainedSize;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return maxRetainedSize + childrenRetainedSize;
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Operator child : children) {
+      child.close();
+    }
+  }
+
+  private boolean dataUnavailable(int index) {
+    return inputTsBlocks[index] == null
+        || consumedIndices[index] == inputTsBlocks[index].getPositionCount();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
index 708725289e..be0173de87 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
@@ -56,6 +56,7 @@ public class SchemaFetchScanOperator implements SourceOperator {
   private final Map<Integer, Template> templateMap;
 
   private final ISchemaRegion schemaRegion;
+  private final boolean withTags;
 
   private boolean isFinished = false;
 
@@ -64,12 +65,14 @@ public class SchemaFetchScanOperator implements SourceOperator {
       OperatorContext context,
       PathPatternTree patternTree,
       Map<Integer, Template> templateMap,
-      ISchemaRegion schemaRegion) {
+      ISchemaRegion schemaRegion,
+      boolean withTags) {
     this.sourceId = planNodeId;
     this.operatorContext = context;
     this.patternTree = patternTree;
     this.schemaRegion = schemaRegion;
     this.templateMap = templateMap;
+    this.withTags = withTags;
   }
 
   @Override
@@ -110,7 +113,7 @@ public class SchemaFetchScanOperator implements SourceOperator {
     ClusterSchemaTree schemaTree = new ClusterSchemaTree();
     List<PartialPath> partialPathList = patternTree.getAllPathPatterns();
     for (PartialPath path : partialPathList) {
-      schemaTree.appendMeasurementPaths(schemaRegion.fetchSchema(path, templateMap));
+      schemaTree.appendMeasurementPaths(schemaRegion.fetchSchema(path, templateMap, withTags));
     }
 
     ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index e3c70fbbf2..8e346a03a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -87,8 +87,24 @@ public class Analysis {
   // all aggregations that need to be calculated
   private Set<Expression> aggregationExpressions;
 
-  // map from grouped path name to list of input aggregation in `GROUP BY LEVEL` clause
-  private Map<Expression, Set<Expression>> groupByLevelExpressions;
+  // An ordered map from cross-timeseries aggregation to list of inner-timeseries aggregations. The
+  // keys' order is the output one.
+  private LinkedHashMap<Expression, Set<Expression>> crossGroupByExpressions;
+
+  // tag keys specified in `GROUP BY TAG` clause
+  private List<String> tagKeys;
+
+  // {tag values -> {grouped expression -> output expressions}}
+  // For different combination of tag keys, the grouped expression may be different. Let's say there
+  // are 3 timeseries root.sg.d1.temperature, root.sg.d1.status, root.sg.d2.temperature, and their
+  // tags are [k1=v1], [k1=v1] and [k1=v2] respectively. For query "SELECT last_value(**) FROM root
+  // GROUP BY k1", timeseries are grouped by their tags into 2 buckets. Bucket [v1] has
+  // [root.sg.d1.temperature, root.sg.d1.status], while bucket [v2] has [root.sg.d2.temperature].
+  // Thus, the aggregation results of bucket [v1] and [v2] are different. Bucket [v1] has 2
+  // aggregation results last_value(temperature) and last_value(status), whereas bucket [v2] only
+  // has [last_value(temperature)].
+  private Map<List<String>, LinkedHashMap<Expression, List<Expression>>>
+      tagValuesToGroupedTimeseriesOperands;
 
   /////////////////////////////////////////////////////////////////////////////////////////////////
   // Query Analysis (used in ALIGN BY DEVICE)
@@ -226,12 +242,13 @@ public class Analysis {
         || (schemaPartition != null && !schemaPartition.isEmpty());
   }
 
-  public Map<Expression, Set<Expression>> getGroupByLevelExpressions() {
-    return groupByLevelExpressions;
+  public LinkedHashMap<Expression, Set<Expression>> getCrossGroupByExpressions() {
+    return crossGroupByExpressions;
   }
 
-  public void setGroupByLevelExpressions(Map<Expression, Set<Expression>> groupByLevelExpressions) {
-    this.groupByLevelExpressions = groupByLevelExpressions;
+  public void setCrossGroupByExpressions(
+      LinkedHashMap<Expression, Set<Expression>> crossGroupByExpressions) {
+    this.crossGroupByExpressions = crossGroupByExpressions;
   }
 
   public FillDescriptor getFillDescriptor() {
@@ -419,4 +436,23 @@ public class Analysis {
   public void setDeviceViewOutputExpressions(Set<Expression> deviceViewOutputExpressions) {
     this.deviceViewOutputExpressions = deviceViewOutputExpressions;
   }
+
+  public List<String> getTagKeys() {
+    return tagKeys;
+  }
+
+  public void setTagKeys(List<String> tagKeys) {
+    this.tagKeys = tagKeys;
+  }
+
+  public Map<List<String>, LinkedHashMap<Expression, List<Expression>>>
+      getTagValuesToGroupedTimeseriesOperands() {
+    return tagValuesToGroupedTimeseriesOperands;
+  }
+
+  public void setTagValuesToGroupedTimeseriesOperands(
+      Map<List<String>, LinkedHashMap<Expression, List<Expression>>>
+          tagValuesToGroupedTimeseriesOperands) {
+    this.tagValuesToGroupedTimeseriesOperands = tagValuesToGroupedTimeseriesOperands;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 25fa7a2538..92a70eddbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -50,7 +50,9 @@ import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
@@ -193,7 +195,12 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
 
       // request schema fetch API
       logger.info("[StartFetchSchema]");
-      ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
+      ISchemaTree schemaTree;
+      if (queryStatement.isGroupByTag()) {
+        schemaTree = schemaFetcher.fetchSchemaWithTags(patternTree);
+      } else {
+        schemaTree = schemaFetcher.fetchSchema(patternTree);
+      }
       logger.info("[EndFetchSchema]");
       // If there is no leaf node in the schema tree, the query should be completed immediately
       if (schemaTree.isEmpty()) {
@@ -236,7 +243,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
 
         analyzeHaving(analysis, queryStatement, schemaTree);
         analyzeGroupByLevel(analysis, queryStatement, outputExpressions);
-
+        analyzeGroupByTag(analysis, queryStatement, outputExpressions, schemaTree);
         Set<Expression> selectExpressions =
             outputExpressions.stream()
                 .map(Pair::getLeft)
@@ -583,7 +590,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       groupedSelectExpressions.add(groupedExpression);
     }
 
-    Map<Expression, Set<Expression>> groupByLevelExpressions = new LinkedHashMap<>();
+    LinkedHashMap<Expression, Set<Expression>> groupByLevelExpressions = new LinkedHashMap<>();
     if (queryStatement.hasHaving()) {
       // update havingExpression
       Expression havingExpression = groupByLevelController.control(analysis.getHavingExpression());
@@ -623,7 +630,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     }
 
     checkDataTypeConsistencyInGroupByLevel(analysis, groupByLevelExpressions);
-    analysis.setGroupByLevelExpressions(groupByLevelExpressions);
+    analysis.setCrossGroupByExpressions(groupByLevelExpressions);
   }
 
   private void checkDataTypeConsistencyInGroupByLevel(
@@ -665,6 +672,85 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     return new Pair<>(expressionWithoutAlias, alias);
   }
 
+  /**
+   * This method is used to analyze GROUP BY TAGS query.
+   *
+   * <p>TODO: support slimit/soffset/value filter
+   */
+  private void analyzeGroupByTag(
+      Analysis analysis,
+      QueryStatement queryStatement,
+      List<Pair<Expression, String>> outputExpressions,
+      ISchemaTree schemaTree) {
+    if (!queryStatement.isGroupByTag()) {
+      return;
+    }
+    if (analysis.hasValueFilter()) {
+      throw new SemanticException("Only time filters are supported in GROUP BY TAGS query");
+    }
+    Map<List<String>, LinkedHashMap<Expression, List<Expression>>>
+        tagValuesToGroupedTimeseriesOperands = new HashMap<>();
+    LinkedHashMap<Expression, Set<Expression>> groupByTagOutputExpressions = new LinkedHashMap<>();
+    List<String> tagKeys = queryStatement.getGroupByTagComponent().getTagKeys();
+    List<MeasurementPath> allSelectedPath = schemaTree.getAllMeasurement();
+    Map<MeasurementPath, Map<String, String>> queriedTagMap = new HashMap<>();
+    allSelectedPath.forEach(v -> queriedTagMap.put(v, v.getTagMap()));
+
+    for (Pair<Expression, String> outputExpressionAndAlias : outputExpressions) {
+      if (!(outputExpressionAndAlias.getLeft() instanceof FunctionExpression
+          && outputExpressionAndAlias.getLeft().getExpressions().get(0) instanceof TimeSeriesOperand
+          && outputExpressionAndAlias.getLeft().isBuiltInAggregationFunctionExpression())) {
+        throw new SemanticException(
+            outputExpressionAndAlias.getLeft()
+                + " can't be used in group by tag. It will be supported in the future.");
+      }
+      FunctionExpression outputExpression = (FunctionExpression) outputExpressionAndAlias.getLeft();
+      MeasurementPath measurementPath =
+          (MeasurementPath)
+              ((TimeSeriesOperand) outputExpression.getExpressions().get(0)).getPath();
+      MeasurementPath fakePath = null;
+      try {
+        fakePath =
+            new MeasurementPath(measurementPath.getMeasurement(), measurementPath.getSeriesType());
+      } catch (IllegalPathException e) {
+        // do nothing
+      }
+      Expression measurementExpression = new TimeSeriesOperand(fakePath);
+      Expression groupedExpression =
+          new FunctionExpression(
+              outputExpression.getFunctionName(),
+              outputExpression.getFunctionAttributes(),
+              Collections.singletonList(measurementExpression));
+      groupByTagOutputExpressions
+          .computeIfAbsent(groupedExpression, v -> new HashSet<>())
+          .add(outputExpression);
+      Map<String, String> tagMap = queriedTagMap.get(measurementPath);
+      List<String> tagValues = new ArrayList<>();
+      for (String tagKey : tagKeys) {
+        tagValues.add(tagMap.get(tagKey));
+      }
+      tagValuesToGroupedTimeseriesOperands
+          .computeIfAbsent(tagValues, key -> new LinkedHashMap<>())
+          .computeIfAbsent(groupedExpression, key -> new ArrayList<>())
+          .add(outputExpression.getExpressions().get(0));
+    }
+
+    outputExpressions.clear();
+    for (String tagKey : tagKeys) {
+      Expression tagKeyExpression = new ConstantOperand(TSDataType.TEXT, tagKey);
+      analyzeExpression(analysis, tagKeyExpression);
+      outputExpressions.add(new Pair<>(tagKeyExpression, null));
+    }
+    for (Expression groupByTagOutputExpression : groupByTagOutputExpressions.keySet()) {
+      // TODO: support alias
+      analyzeExpression(analysis, groupByTagOutputExpression);
+      outputExpressions.add(new Pair<>(groupByTagOutputExpression, null));
+    }
+    analysis.setTagKeys(queryStatement.getGroupByTagComponent().getTagKeys());
+    analysis.setTagValuesToGroupedTimeseriesOperands(tagValuesToGroupedTimeseriesOperands);
+    analysis.setCrossGroupByExpressions(groupByTagOutputExpressions);
+  }
+
   private void analyzeDeviceToAggregation(
       Analysis analysis,
       QueryStatement queryStatement,
@@ -692,9 +778,9 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       return;
     }
 
-    if (queryStatement.isGroupByLevel()) {
+    if (queryStatement.isGroupByLevel() || queryStatement.isGroupByTag()) {
       Set<Expression> aggregationExpressions =
-          analysis.getGroupByLevelExpressions().values().stream()
+          analysis.getCrossGroupByExpressions().values().stream()
               .flatMap(Set::stream)
               .collect(Collectors.toSet());
       analysis.setAggregationExpressions(aggregationExpressions);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index eec0eef54f..1be0ae7af3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -93,12 +93,21 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
   @Override
   public ClusterSchemaTree fetchSchema(PathPatternTree patternTree) {
+    return fetchSchema(patternTree, false);
+  }
+
+  @Override
+  public ClusterSchemaTree fetchSchemaWithTags(PathPatternTree patternTree) {
+    return fetchSchema(patternTree, true);
+  }
+
+  private ClusterSchemaTree fetchSchema(PathPatternTree patternTree, boolean withTags) {
     Map<Integer, Template> templateMap = new HashMap<>();
     patternTree.constructTree();
     for (PartialPath pattern : patternTree.getAllPathPatterns()) {
       templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
     }
-    return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap));
+    return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
   }
 
   private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
@@ -361,6 +370,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
               devicePath.concatNode(entry.getKey()),
               (MeasurementSchema) entry.getValue(),
               null,
+              null,
               template.isDirectAligned());
         }
 
@@ -451,6 +461,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
           new MeasurementSchema(
               measurements.get(i), tsDataTypes.get(i), encodings.get(i), compressors.get(i)),
           null,
+          null,
           isAligned);
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
index 89acdc9d59..0f9e31dae6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
@@ -1067,6 +1067,8 @@ public class ExpressionAnalyzer {
       return !expression.isBuiltInAggregationFunctionExpression();
     } else if (expression instanceof TimeSeriesOperand) {
       return false;
+    } else if (expression instanceof ConstantOperand) {
+      return false;
     } else {
       throw new IllegalArgumentException(
           "unsupported expression type: " + expression.getExpressionType());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
index a688949cbb..5ec75fe1da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
@@ -39,6 +39,8 @@ public interface ISchemaFetcher {
 
   ISchemaTree fetchSchema(PathPatternTree patternTree);
 
+  ISchemaTree fetchSchemaWithTags(PathPatternTree patternTree);
+
   ISchemaTree fetchSchemaWithAutoCreate(
       PartialPath devicePath,
       String[] measurements,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
index f4060e7c4e..72f10eaa87 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
@@ -60,6 +60,15 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
 
   @Override
   public ClusterSchemaTree fetchSchema(PathPatternTree patternTree) {
+    return fetchSchema(patternTree, false);
+  }
+
+  @Override
+  public ClusterSchemaTree fetchSchemaWithTags(PathPatternTree patternTree) {
+    return fetchSchema(patternTree, true);
+  }
+
+  private ClusterSchemaTree fetchSchema(PathPatternTree patternTree, boolean withTags) {
     patternTree.constructTree();
     Set<String> storageGroupSet = new HashSet<>();
     ClusterSchemaTree schemaTree = new ClusterSchemaTree();
@@ -72,7 +81,8 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
           SchemaRegionId schemaRegionId =
               localConfigNode.getBelongedSchemaRegionId(storageGroupPath);
           ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
-          schemaTree.appendMeasurementPaths(schemaRegion.getMeasurementPaths(pathPattern, false));
+          schemaTree.appendMeasurementPaths(
+              schemaRegion.getMeasurementPaths(pathPattern, false, withTags));
         }
       }
     } catch (MetadataException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 988515f530..bd0925acaf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -67,6 +67,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
 import org.apache.iotdb.db.mpp.plan.statement.component.FromComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByLevelComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTagComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.HavingCondition;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderByComponent;
@@ -150,6 +151,9 @@ import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CountTimeseriesContext;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CreateFunctionContext;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.DropFunctionContext;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.ExpressionContext;
+import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.GroupByTagClauseContext;
+import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.GroupByTagStatementContext;
+import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.IdentifierContext;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.ShowFunctionsContext;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.UriContext;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParserBaseVisitor;
@@ -974,6 +978,14 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
       queryStatement.setGroupByLevelComponent(groupByLevelComponent);
     }
 
+    if (ctx.TAGS() != null) {
+      List<String> tagKeys = new ArrayList<>();
+      for (IdentifierContext identifierContext : ctx.identifier()) {
+        tagKeys.add(identifierContext.getText());
+      }
+      queryStatement.setGroupByTagComponent(new GroupByTagComponent(tagKeys));
+    }
+
     // parse fill clause
     if (ctx.fillClause() != null) {
       parseFillClause(ctx.fillClause());
@@ -1095,6 +1107,31 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
     queryStatement.setHavingCondition(new HavingCondition(predicate));
   }
 
+  @Override
+  public Statement visitGroupByTagStatement(GroupByTagStatementContext ctx) {
+    // parse group by tag clause
+    parseGroupByTagClause(ctx.groupByTagClause());
+
+    // parse order by time
+    if (ctx.orderByClause() != null) {
+      parseOrderByClause(ctx.orderByClause());
+    }
+
+    return queryStatement;
+  }
+
+  public void parseGroupByTagClause(GroupByTagClauseContext ctx) {
+    Set<String> tagKeys = new LinkedHashSet<>();
+    for (IdentifierContext identifierContext : ctx.identifier()) {
+      String key = parseIdentifier(identifierContext.getText());
+      if (tagKeys.contains(key)) {
+        throw new SemanticException("duplicated key in GROUP BY TAGS: " + key);
+      }
+      tagKeys.add(key);
+    }
+    queryStatement.setGroupByTagComponent(new GroupByTagComponent(new ArrayList<>(tagKeys)));
+  }
+
   // Fill Clause
   @Override
   public Statement visitFillStatement(IoTDBSqlParser.FillStatementContext ctx) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 894c500112..965c4bd477 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
@@ -71,8 +72,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
@@ -92,6 +93,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -124,6 +127,9 @@ public class LogicalPlanBuilder {
   }
 
   private void updateTypeProvider(Collection<Expression> expressions) {
+    if (expressions == null) {
+      return;
+    }
     expressions.forEach(
         expression -> {
           if (!expression.getExpressionString().equals(COLUMN_DEVICE)) {
@@ -134,6 +140,13 @@ public class LogicalPlanBuilder {
         });
   }
 
+  private void updateTypeProviderWithConstantType(List<String> keys, TSDataType dataType) {
+    if (keys == null) {
+      return;
+    }
+    keys.forEach(k -> context.getTypeProvider().setType(k, dataType));
+  }
+
   public LogicalPlanBuilder planRawDataSource(
       Set<Expression> sourceExpressions, Ordering scanOrder, Filter timeFilter) {
     List<PlanNode> sourceNodeList = new ArrayList<>();
@@ -206,7 +219,10 @@ public class LogicalPlanBuilder {
       GroupByTimeParameter groupByTimeParameter,
       Set<Expression> aggregationExpressions,
       Set<Expression> sourceTransformExpressions,
-      Map<Expression, Set<Expression>> groupByLevelExpressions) {
+      LinkedHashMap<Expression, Set<Expression>> crossGroupByAggregations,
+      List<String> tagKeys,
+      Map<List<String>, LinkedHashMap<Expression, List<Expression>>>
+          tagValuesToGroupedTimeseriesOperands) {
     boolean needCheckAscending = groupByTimeParameter == null;
     Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>();
     Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>();
@@ -236,7 +252,9 @@ public class LogicalPlanBuilder {
         scanOrder,
         groupByTimeParameter,
         aggregationExpressions,
-        groupByLevelExpressions);
+        crossGroupByAggregations,
+        tagKeys,
+        tagValuesToGroupedTimeseriesOperands);
   }
 
   public LogicalPlanBuilder planAggregationSourceWithIndexAdjust(
@@ -246,7 +264,7 @@ public class LogicalPlanBuilder {
       GroupByTimeParameter groupByTimeParameter,
       Set<Expression> aggregationExpressions,
       Set<Expression> sourceTransformExpressions,
-      Map<Expression, Set<Expression>> groupByLevelExpressions,
+      LinkedHashMap<Expression, Set<Expression>> crossGroupByExpressions,
       List<Integer> deviceViewInputIndexes) {
     checkArgument(
         aggregationExpressions.size() == deviceViewInputIndexes.size(),
@@ -300,7 +318,9 @@ public class LogicalPlanBuilder {
         scanOrder,
         groupByTimeParameter,
         aggregationExpressions,
-        groupByLevelExpressions);
+        crossGroupByExpressions,
+        null,
+        null);
   }
 
   private AggregationDescriptor createAggregationDescriptor(
@@ -376,11 +396,14 @@ public class LogicalPlanBuilder {
       Ordering scanOrder,
       GroupByTimeParameter groupByTimeParameter,
       Set<Expression> aggregationExpressions,
-      Map<Expression, Set<Expression>> groupByLevelExpressions) {
+      LinkedHashMap<Expression, Set<Expression>> crossGroupByExpressions,
+      List<String> tagKeys,
+      Map<List<String>, LinkedHashMap<Expression, List<Expression>>>
+          tagValuesToGroupedTimeseriesOperands) {
     if (curStep.isOutputPartial()) {
       if (groupByTimeParameter != null && groupByTimeParameter.hasOverlap()) {
         curStep =
-            groupByLevelExpressions != null ? AggregationStep.INTERMEDIATE : AggregationStep.FINAL;
+            crossGroupByExpressions != null ? AggregationStep.INTERMEDIATE : AggregationStep.FINAL;
 
         this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
 
@@ -388,23 +411,46 @@ public class LogicalPlanBuilder {
             createSlidingWindowAggregationNode(
                 this.getRoot(), aggregationExpressions, groupByTimeParameter, curStep, scanOrder);
 
-        if (groupByLevelExpressions != null) {
+        if (crossGroupByExpressions != null) {
+          curStep = AggregationStep.FINAL;
+          if (tagKeys != null) {
+            this.root =
+                createGroupByTagNode(
+                    tagKeys,
+                    tagValuesToGroupedTimeseriesOperands,
+                    crossGroupByExpressions.keySet(),
+                    Collections.singletonList(this.getRoot()),
+                    curStep,
+                    groupByTimeParameter,
+                    scanOrder);
+          } else {
+            this.root =
+                createGroupByTLevelNode(
+                    Collections.singletonList(this.getRoot()),
+                    crossGroupByExpressions,
+                    curStep,
+                    groupByTimeParameter,
+                    scanOrder);
+          }
+        }
+      } else {
+        if (tagKeys != null) {
           curStep = AggregationStep.FINAL;
           this.root =
-              createGroupByTLevelNode(
-                  Collections.singletonList(this.getRoot()),
-                  groupByLevelExpressions,
+              createGroupByTagNode(
+                  tagKeys,
+                  tagValuesToGroupedTimeseriesOperands,
+                  crossGroupByExpressions.keySet(),
+                  sourceNodeList,
                   curStep,
                   groupByTimeParameter,
                   scanOrder);
-        }
-      } else {
-        if (groupByLevelExpressions != null) {
+        } else if (crossGroupByExpressions != null) {
           curStep = AggregationStep.FINAL;
           this.root =
               createGroupByTLevelNode(
                   sourceNodeList,
-                  groupByLevelExpressions,
+                  crossGroupByExpressions,
                   curStep,
                   groupByTimeParameter,
                   scanOrder);
@@ -432,7 +478,7 @@ public class LogicalPlanBuilder {
   }
 
   public static void updateTypeProviderByPartialAggregation(
-      GroupByLevelDescriptor aggregationDescriptor, TypeProvider typeProvider) {
+      CrossSeriesAggregationDescriptor aggregationDescriptor, TypeProvider typeProvider) {
     List<AggregationType> splitAggregations =
         SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType());
     PartialPath path = ((TimeSeriesOperand) aggregationDescriptor.getOutputExpression()).getPath();
@@ -570,10 +616,10 @@ public class LogicalPlanBuilder {
       AggregationStep curStep,
       GroupByTimeParameter groupByTimeParameter,
       Ordering scanOrder) {
-    List<GroupByLevelDescriptor> groupByLevelDescriptors = new ArrayList<>();
+    List<CrossSeriesAggregationDescriptor> groupByLevelDescriptors = new ArrayList<>();
     for (Expression groupedExpression : groupByLevelExpressions.keySet()) {
       groupByLevelDescriptors.add(
-          new GroupByLevelDescriptor(
+          new CrossSeriesAggregationDescriptor(
               ((FunctionExpression) groupedExpression).getFunctionName(),
               curStep,
               groupByLevelExpressions.get(groupedExpression).stream()
@@ -585,7 +631,7 @@ public class LogicalPlanBuilder {
     updateTypeProvider(groupByLevelExpressions.keySet());
     updateTypeProvider(
         groupByLevelDescriptors.stream()
-            .map(GroupByLevelDescriptor::getOutputExpression)
+            .map(CrossSeriesAggregationDescriptor::getOutputExpression)
             .collect(Collectors.toList()));
     return new GroupByLevelNode(
         context.getQueryId().genPlanNodeId(),
@@ -595,6 +641,59 @@ public class LogicalPlanBuilder {
         scanOrder);
   }
 
+  private PlanNode createGroupByTagNode(
+      List<String> tagKeys,
+      Map<List<String>, LinkedHashMap<Expression, List<Expression>>>
+          tagValuesToGroupedTimeseriesOperands,
+      Collection<Expression> groupByTagOutputExpressions,
+      List<PlanNode> children,
+      AggregationStep curStep,
+      GroupByTimeParameter groupByTimeParameter,
+      Ordering scanOrder) {
+    Map<List<String>, List<CrossSeriesAggregationDescriptor>> tagValuesToAggregationDescriptors =
+        new HashMap<>();
+    for (List<String> tagValues : tagValuesToGroupedTimeseriesOperands.keySet()) {
+      LinkedHashMap<Expression, List<Expression>> groupedTimeseriesOperands =
+          tagValuesToGroupedTimeseriesOperands.get(tagValues);
+      List<CrossSeriesAggregationDescriptor> aggregationDescriptors = new ArrayList<>();
+
+      Iterator<Expression> iter = groupedTimeseriesOperands.keySet().iterator();
+      for (Expression groupByTagOutputExpression : groupByTagOutputExpressions) {
+        if (!iter.hasNext()) {
+          aggregationDescriptors.add(null);
+          continue;
+        }
+        Expression next = iter.next();
+        if (next.equals(groupByTagOutputExpression)) {
+          String functionName = ((FunctionExpression) next).getFunctionName().toUpperCase();
+          CrossSeriesAggregationDescriptor aggregationDescriptor =
+              new CrossSeriesAggregationDescriptor(
+                  functionName,
+                  curStep,
+                  groupedTimeseriesOperands.get(next),
+                  next.getExpressions().get(0));
+          aggregationDescriptors.add(aggregationDescriptor);
+        } else {
+          aggregationDescriptors.add(null);
+        }
+      }
+      tagValuesToAggregationDescriptors.put(tagValues, aggregationDescriptors);
+    }
+
+    updateTypeProvider(groupByTagOutputExpressions);
+    updateTypeProviderWithConstantType(tagKeys, TSDataType.TEXT);
+    return new GroupByTagNode(
+        context.getQueryId().genPlanNodeId(),
+        children,
+        groupByTimeParameter,
+        scanOrder,
+        tagKeys,
+        tagValuesToAggregationDescriptors,
+        groupByTagOutputExpressions.stream()
+            .map(Expression::toString)
+            .collect(Collectors.toList()));
+  }
+
   private SeriesAggregationSourceNode createAggregationScanNode(
       PartialPath selectPath,
       List<AggregationDescriptor> aggregationDescriptorList,
@@ -805,7 +904,8 @@ public class LogicalPlanBuilder {
   public LogicalPlanBuilder planSchemaFetchSource(
       List<String> storageGroupList,
       PathPatternTree patternTree,
-      Map<Integer, Template> templateMap) {
+      Map<Integer, Template> templateMap,
+      boolean withTags) {
     PartialPath storageGroupPath;
     for (String storageGroup : storageGroupList) {
       try {
@@ -822,7 +922,8 @@ public class LogicalPlanBuilder {
                 context.getQueryId().genPlanNodeId(),
                 storageGroupPath,
                 overlappedPatternTree,
-                templateMap));
+                templateMap,
+                withTags));
       } catch (IllegalPathException e) {
         // definitely won't happen
         throw new RuntimeException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 267785f607..2f2f954520 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -232,13 +232,13 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
         if (queryStatement.isGroupByLevel()) {
           planBuilder =
               planBuilder.planGroupByLevel(
-                  analysis.getGroupByLevelExpressions(),
+                  analysis.getCrossGroupByExpressions(),
                   analysis.getGroupByTimeParameter(),
                   queryStatement.getResultTimeOrder());
         }
       } else {
         curStep =
-            (analysis.getGroupByLevelExpressions() != null
+            (analysis.getCrossGroupByExpressions() != null
                     || (analysis.getGroupByTimeParameter() != null
                         && analysis.getGroupByTimeParameter().hasOverlap()))
                 ? AggregationStep.PARTIAL
@@ -253,7 +253,9 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
                     analysis.getGroupByTimeParameter(),
                     aggregationExpressions,
                     sourceTransformExpressions,
-                    analysis.getGroupByLevelExpressions())
+                    analysis.getCrossGroupByExpressions(),
+                    analysis.getTagKeys(),
+                    analysis.getTagValuesToGroupedTimeseriesOperands())
                 : planBuilder.planAggregationSourceWithIndexAdjust(
                     curStep,
                     queryStatement.getResultTimeOrder(),
@@ -261,7 +263,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
                     analysis.getGroupByTimeParameter(),
                     aggregationExpressions,
                     sourceTransformExpressions,
-                    analysis.getGroupByLevelExpressions(),
+                    analysis.getCrossGroupByExpressions(),
                     deviceViewInputIndexes);
       }
     }
@@ -632,7 +634,8 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
         .planSchemaFetchSource(
             storageGroupList,
             schemaFetchStatement.getPatternTree(),
-            schemaFetchStatement.getTemplateMap())
+            schemaFetchStatement.getTemplateMap(),
+            schemaFetchStatement.isWithTags())
         .getRoot();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1d364bf09a..f3de5e72d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.TagAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
@@ -135,6 +136,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
@@ -152,8 +154,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn;
@@ -180,6 +182,7 @@ import org.apache.commons.lang3.Validate;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -1051,8 +1054,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     boolean ascending = node.getScanOrder() == Ordering.ASC;
     List<Aggregator> aggregators = new ArrayList<>();
     Map<String, List<InputLocation>> layout = makeLayout(node);
-    List<GroupByLevelDescriptor> aggregationDescriptors = node.getGroupByLevelDescriptors();
-    for (GroupByLevelDescriptor descriptor : aggregationDescriptors) {
+    List<CrossSeriesAggregationDescriptor> aggregationDescriptors =
+        node.getGroupByLevelDescriptors();
+    for (CrossSeriesAggregationDescriptor descriptor : aggregationDescriptors) {
       List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
       TSDataType seriesDataType =
           context
@@ -1086,6 +1090,69 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         operatorContext, aggregators, timeRangeIterator, children, maxReturnSize);
   }
 
+  @Override
+  public Operator visitGroupByTag(GroupByTagNode node, LocalExecutionPlanContext context) {
+    checkArgument(node.getTagKeys().size() >= 1, "GroupByTag tag keys cannot be empty");
+    checkArgument(
+        node.getTagValuesToAggregationDescriptors().size() >= 1,
+        "GroupByTag aggregation descriptors cannot be empty");
+
+    List<Operator> children =
+        node.getChildren().stream()
+            .map(child -> child.accept(this, context))
+            .collect(Collectors.toList());
+
+    boolean ascending = node.getScanOrder() == Ordering.ASC;
+    Map<String, List<InputLocation>> layout = makeLayout(node);
+    List<List<String>> groups = new ArrayList<>();
+    List<List<Aggregator>> groupedAggregators = new ArrayList<>();
+    int aggregatorCount = 0;
+    for (Map.Entry<List<String>, List<CrossSeriesAggregationDescriptor>> entry :
+        node.getTagValuesToAggregationDescriptors().entrySet()) {
+      groups.add(entry.getKey());
+      List<Aggregator> aggregators = new ArrayList<>();
+      for (CrossSeriesAggregationDescriptor aggregationDescriptor : entry.getValue()) {
+        if (aggregationDescriptor == null) {
+          aggregators.add(null);
+          continue;
+        }
+        List<InputLocation[]> inputLocations = calcInputLocationList(aggregationDescriptor, layout);
+        TSDataType seriesDataType =
+            context
+                .getTypeProvider()
+                .getType(aggregationDescriptor.getInputExpressions().get(0).getExpressionString());
+        aggregators.add(
+            new Aggregator(
+                AccumulatorFactory.createAccumulator(
+                    aggregationDescriptor.getAggregationType(), seriesDataType, ascending),
+                aggregationDescriptor.getStep(),
+                inputLocations));
+      }
+      groupedAggregators.add(aggregators);
+      aggregatorCount += aggregators.size();
+    }
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(groupByTimeParameter, ascending, false);
+    List<AggregationDescriptor> aggregationDescriptors =
+        node.getTagValuesToAggregationDescriptors().values().stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toList());
+    long maxReturnSize =
+        calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
+    OperatorContext operatorContext =
+        context
+            .getInstanceContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                TagAggregationOperator.class.getSimpleName());
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregatorCount);
+    return new TagAggregationOperator(
+        operatorContext, groups, groupedAggregators, children, maxReturnSize);
+  }
+
   @Override
   public Operator visitSlidingWindowAggregation(
       SlidingWindowAggregationNode node, LocalExecutionPlanContext context) {
@@ -1417,8 +1484,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         operatorContext,
         node.getPatternTree(),
         node.getTemplateMap(),
-        ((SchemaDriverContext) (context.getInstanceContext().getDriverContext()))
-            .getSchemaRegion());
+        ((SchemaDriverContext) (context.getInstanceContext().getDriverContext())).getSchemaRegion(),
+        node.isWithTags());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
index 6eb323b078..a95adc10c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.SimplePlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
@@ -96,6 +97,14 @@ public class SubPlanTypeExtractor {
       return visitPlan(node, context);
     }
 
+    @Override
+    public Void visitGroupByTag(GroupByTagNode node, Void context) {
+      node.getTagValuesToAggregationDescriptors()
+          .values()
+          .forEach(this::updateTypeProviderByAggregationDescriptor);
+      return visitPlan(node, context);
+    }
+
     private void updateTypeProviderByAggregationDescriptor(
         List<? extends AggregationDescriptor> aggregationDescriptorList) {
       aggregationDescriptorList.stream()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index d21bf1a195..37c487b4ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
@@ -236,6 +237,10 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
     return processOneChildNode(node, context);
   }
 
+  public PlanNode visitGroupByTag(GroupByTagNode node, NodeGroupContext context) {
+    return processMultiChildNode(node, context);
+  }
+
   private PlanNode processMultiChildNode(MultiChildNode node, NodeGroupContext context) {
     MultiChildNode newNode = (MultiChildNode) node.clone();
     List<PlanNode> visitedChildren = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index c608e09e1b..8df3a16804 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
@@ -54,7 +55,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesSourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 
 import java.util.ArrayList;
@@ -594,6 +595,23 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     return newRoot;
   }
 
+  @Override
+  public PlanNode visitGroupByTag(GroupByTagNode root, DistributionPlanContext context) {
+    List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context);
+    Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
+        sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+
+    boolean containsSlidingWindow =
+        root.getChildren().size() == 1
+            && root.getChildren().get(0) instanceof SlidingWindowAggregationNode;
+
+    // TODO: use 2 phase aggregation to optimize the query
+    return containsSlidingWindow
+        ? groupSourcesForGroupByTagWithSlidingWindow(
+            root, (SlidingWindowAggregationNode) root.getChildren().get(0), sourceGroup, context)
+        : groupSourcesForGroupByTag(root, sourceGroup, context);
+  }
+
   // If the Aggregation Query contains value filter, we need to use the naive query plan
   // for it. That is, do the raw data query and then do the aggregation operation.
   // Currently, the method to judge whether the query should use naive query plan is whether
@@ -709,8 +727,9 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
       GroupByLevelNode handle = (GroupByLevelNode) node;
       // Check every OutputColumn of GroupByLevelNode and set the Expression of corresponding
       // AggregationDescriptor
-      List<GroupByLevelDescriptor> descriptorList = new ArrayList<>();
-      for (GroupByLevelDescriptor originalDescriptor : handle.getGroupByLevelDescriptors()) {
+      List<CrossSeriesAggregationDescriptor> descriptorList = new ArrayList<>();
+      for (CrossSeriesAggregationDescriptor originalDescriptor :
+          handle.getGroupByLevelDescriptors()) {
         Set<Expression> descriptorExpressions = new HashSet<>();
         for (String childColumn : childrenOutputColumns) {
           // If this condition matched, the childColumn should come from GroupByLevelNode
@@ -727,7 +746,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
         if (descriptorExpressions.size() == 0) {
           continue;
         }
-        GroupByLevelDescriptor descriptor = originalDescriptor.deepClone();
+        CrossSeriesAggregationDescriptor descriptor = originalDescriptor.deepClone();
         descriptor.setStep(level == 0 ? AggregationStep.FINAL : AggregationStep.INTERMEDIATE);
         descriptor.setInputExpressions(new ArrayList<>(descriptorExpressions));
         descriptorList.add(descriptor);
@@ -738,6 +757,70 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     }
   }
 
+  private GroupByTagNode groupSourcesForGroupByTagWithSlidingWindow(
+      GroupByTagNode root,
+      SlidingWindowAggregationNode slidingWindowNode,
+      Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup,
+      DistributionPlanContext context) {
+    GroupByTagNode newRoot = (GroupByTagNode) root.clone();
+    sourceGroup.forEach(
+        (dataRegion, sourceNodes) -> {
+          SlidingWindowAggregationNode parentOfGroup =
+              (SlidingWindowAggregationNode) slidingWindowNode.clone();
+          parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+          List<AggregationDescriptor> childDescriptors = new ArrayList<>();
+          sourceNodes.forEach(
+              n ->
+                  n.getAggregationDescriptorList()
+                      .forEach(
+                          v -> {
+                            childDescriptors.add(
+                                new AggregationDescriptor(
+                                    v.getAggregationFuncName(),
+                                    AggregationStep.INTERMEDIATE,
+                                    v.getInputExpressions()));
+                          }));
+          parentOfGroup.setAggregationDescriptorList(childDescriptors);
+          if (sourceNodes.size() == 1) {
+            parentOfGroup.addChild(sourceNodes.get(0));
+          } else {
+            PlanNode timeJoinNode =
+                new TimeJoinNode(
+                    context.queryContext.getQueryId().genPlanNodeId(), root.getScanOrder());
+            sourceNodes.forEach(timeJoinNode::addChild);
+            parentOfGroup.addChild(timeJoinNode);
+          }
+          newRoot.addChild(parentOfGroup);
+        });
+    return newRoot;
+  }
+
+  private GroupByTagNode groupSourcesForGroupByTag(
+      GroupByTagNode root,
+      Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup,
+      DistributionPlanContext context) {
+    GroupByTagNode newRoot = (GroupByTagNode) root.clone();
+    final boolean[] addParent = {false};
+    sourceGroup.forEach(
+        (dataRegion, sourceNodes) -> {
+          if (sourceNodes.size() == 1) {
+            newRoot.addChild(sourceNodes.get(0));
+          } else {
+            if (!addParent[0]) {
+              sourceNodes.forEach(newRoot::addChild);
+              addParent[0] = true;
+            } else {
+              PlanNode timeJoinNode =
+                  new TimeJoinNode(
+                      context.queryContext.getQueryId().genPlanNodeId(), root.getScanOrder());
+              sourceNodes.forEach(timeJoinNode::addChild);
+              newRoot.addChild(timeJoinNode);
+            }
+          }
+        });
+    return newRoot;
+  }
+
   // TODO: (xingtanzjr) need to confirm the logic when processing UDF
   private boolean isAggColumnMatchExpression(String columnName, Expression expression) {
     if (columnName == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
index 508730a854..e803180818 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
@@ -41,6 +42,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 
 import org.apache.commons.lang3.Validate;
 
@@ -48,6 +50,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter.GraphContext> {
 
@@ -175,6 +178,34 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
     return render(node, boxValue, context);
   }
 
+  @Override
+  public List<String> visitGroupByTag(GroupByTagNode node, GraphContext context) {
+    List<String> boxValue = new ArrayList<>();
+    boxValue.add(String.format("GroupByTag-%s", node.getPlanNodeId().getId()));
+    boxValue.add(String.format("Tag keys: %s", node.getTagKeys()));
+    int bucketIdx = 0;
+    for (Entry<List<String>, List<CrossSeriesAggregationDescriptor>> entry :
+        node.getTagValuesToAggregationDescriptors().entrySet()) {
+      boxValue.add(String.format("Bucket-%d: %s", bucketIdx, entry.getKey()));
+      int aggregatorIdx = 0;
+      for (CrossSeriesAggregationDescriptor descriptor : entry.getValue()) {
+        if (descriptor == null) {
+          boxValue.add(String.format("    Aggregator-%d: NULL", aggregatorIdx));
+        } else {
+          boxValue.add(
+              String.format(
+                  "    Aggregator-%d: %s, %s",
+                  aggregatorIdx, descriptor.getAggregationType(), descriptor.getStep()));
+          boxValue.add(String.format("      Output: %s", descriptor.getOutputColumnNames()));
+          boxValue.add(String.format("      Input: %s", descriptor.getInputExpressions()));
+        }
+        aggregatorIdx += 1;
+      }
+      bucketIdx += 1;
+    }
+    return render(node, boxValue, context);
+  }
+
   @Override
   public List<String> visitSlidingWindowAggregation(
       SlidingWindowAggregationNode node, GraphContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 054d4b21de..dd102898d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
@@ -138,7 +139,8 @@ public enum PlanNodeType {
   PATHS_USING_TEMPLATE_SCAN((short) 54),
   LOAD_TSFILE((short) 55),
   CONSTRUCT_SCHEMA_BLACK_LIST_NODE((short) 56),
-  ROLLBACK_SCHEMA_BLACK_LIST_NODE((short) 57);
+  ROLLBACK_SCHEMA_BLACK_LIST_NODE((short) 57),
+  GROUP_BY_TAG((short) 58);
 
   public static final int BYTES = Short.BYTES;
 
@@ -303,6 +305,8 @@ public enum PlanNodeType {
         return ConstructSchemaBlackListNode.deserialize(buffer);
       case 57:
         return RollbackSchemaBlackListNode.deserialize(buffer);
+      case 58:
+        return GroupByTagNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index a8cea87284..8b4155cb53 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
@@ -117,6 +118,10 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
+  public R visitGroupByTag(GroupByTagNode node, C context) {
+    return visitPlan(node, context);
+  }
+
   public R visitSlidingWindowAggregation(SlidingWindowAggregationNode node, C context) {
     return visitPlan(node, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchScanNode.java
index 3e9db0ac37..d066080b98 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchScanNode.java
@@ -48,19 +48,21 @@ public class SchemaFetchScanNode extends SourceNode {
   private final PartialPath storageGroup;
   private final PathPatternTree patternTree;
   private final Map<Integer, Template> templateMap;
-
+  private final boolean withTags;
   private TRegionReplicaSet schemaRegionReplicaSet;
 
   public SchemaFetchScanNode(
       PlanNodeId id,
       PartialPath storageGroup,
       PathPatternTree patternTree,
-      Map<Integer, Template> templateMap) {
+      Map<Integer, Template> templateMap,
+      boolean withTags) {
     super(id);
     this.storageGroup = storageGroup;
     this.patternTree = patternTree;
     this.patternTree.constructTree();
     this.templateMap = templateMap;
+    this.withTags = withTags;
   }
 
   public PartialPath getStorageGroup() {
@@ -85,7 +87,8 @@ public class SchemaFetchScanNode extends SourceNode {
 
   @Override
   public PlanNode clone() {
-    return new SchemaFetchScanNode(getPlanNodeId(), storageGroup, patternTree, templateMap);
+    return new SchemaFetchScanNode(
+        getPlanNodeId(), storageGroup, patternTree, templateMap, withTags);
   }
 
   @Override
@@ -117,6 +120,7 @@ public class SchemaFetchScanNode extends SourceNode {
     for (Template template : templateMap.values()) {
       template.serialize(byteBuffer);
     }
+    ReadWriteIOUtils.write(withTags, byteBuffer);
   }
 
   @Override
@@ -128,6 +132,7 @@ public class SchemaFetchScanNode extends SourceNode {
     for (Template template : templateMap.values()) {
       template.serialize(stream);
     }
+    ReadWriteIOUtils.write(withTags, stream);
   }
 
   public static SchemaFetchScanNode deserialize(ByteBuffer byteBuffer) {
@@ -142,9 +147,9 @@ public class SchemaFetchScanNode extends SourceNode {
       template.deserialize(byteBuffer);
       templateMap.put(template.getId(), template);
     }
-
+    boolean withTags = ReadWriteIOUtils.readBool(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new SchemaFetchScanNode(planNodeId, storageGroup, patternTree, templateMap);
+    return new SchemaFetchScanNode(planNodeId, storageGroup, patternTree, templateMap, withTags);
   }
 
   @Override
@@ -167,4 +172,8 @@ public class SchemaFetchScanNode extends SourceNode {
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitSchemaFetchScan(this, context);
   }
+
+  public boolean isWithTags() {
+    return withTags;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index 3c7c653b70..0fa39ff184 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -56,7 +56,7 @@ public class GroupByLevelNode extends MultiChildNode {
 
   // The list of aggregate descriptors
   // each GroupByLevelDescriptor will be output as one or two column of result TsBlock
-  protected List<GroupByLevelDescriptor> groupByLevelDescriptors;
+  protected List<CrossSeriesAggregationDescriptor> groupByLevelDescriptors;
 
   // The parameter of `group by time`.
   // Its value will be null if there is no `group by time` clause.
@@ -67,7 +67,7 @@ public class GroupByLevelNode extends MultiChildNode {
   public GroupByLevelNode(
       PlanNodeId id,
       List<PlanNode> children,
-      List<GroupByLevelDescriptor> groupByLevelDescriptors,
+      List<CrossSeriesAggregationDescriptor> groupByLevelDescriptors,
       GroupByTimeParameter groupByTimeParameter,
       Ordering scanOrder) {
     super(id, children);
@@ -78,7 +78,7 @@ public class GroupByLevelNode extends MultiChildNode {
 
   public GroupByLevelNode(
       PlanNodeId id,
-      List<GroupByLevelDescriptor> groupByLevelDescriptors,
+      List<CrossSeriesAggregationDescriptor> groupByLevelDescriptors,
       GroupByTimeParameter groupByTimeParameter,
       Ordering scanOrder) {
     super(id);
@@ -108,11 +108,12 @@ public class GroupByLevelNode extends MultiChildNode {
         getPlanNodeId(), getGroupByLevelDescriptors(), this.groupByTimeParameter, this.scanOrder);
   }
 
-  public List<GroupByLevelDescriptor> getGroupByLevelDescriptors() {
+  public List<CrossSeriesAggregationDescriptor> getGroupByLevelDescriptors() {
     return groupByLevelDescriptors;
   }
 
-  public void setGroupByLevelDescriptors(List<GroupByLevelDescriptor> groupByLevelDescriptors) {
+  public void setGroupByLevelDescriptors(
+      List<CrossSeriesAggregationDescriptor> groupByLevelDescriptors) {
     this.groupByLevelDescriptors = groupByLevelDescriptors;
   }
 
@@ -133,7 +134,7 @@ public class GroupByLevelNode extends MultiChildNode {
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.GROUP_BY_LEVEL.serialize(byteBuffer);
     ReadWriteIOUtils.write(groupByLevelDescriptors.size(), byteBuffer);
-    for (GroupByLevelDescriptor groupByLevelDescriptor : groupByLevelDescriptors) {
+    for (CrossSeriesAggregationDescriptor groupByLevelDescriptor : groupByLevelDescriptors) {
       groupByLevelDescriptor.serialize(byteBuffer);
     }
     if (groupByTimeParameter == null) {
@@ -149,7 +150,7 @@ public class GroupByLevelNode extends MultiChildNode {
   protected void serializeAttributes(DataOutputStream stream) throws IOException {
     PlanNodeType.GROUP_BY_LEVEL.serialize(stream);
     ReadWriteIOUtils.write(groupByLevelDescriptors.size(), stream);
-    for (GroupByLevelDescriptor groupByLevelDescriptor : groupByLevelDescriptors) {
+    for (CrossSeriesAggregationDescriptor groupByLevelDescriptor : groupByLevelDescriptors) {
       groupByLevelDescriptor.serialize(stream);
     }
     if (groupByTimeParameter == null) {
@@ -163,9 +164,9 @@ public class GroupByLevelNode extends MultiChildNode {
 
   public static GroupByLevelNode deserialize(ByteBuffer byteBuffer) {
     int descriptorSize = ReadWriteIOUtils.readInt(byteBuffer);
-    List<GroupByLevelDescriptor> groupByLevelDescriptors = new ArrayList<>();
+    List<CrossSeriesAggregationDescriptor> groupByLevelDescriptors = new ArrayList<>();
     while (descriptorSize > 0) {
-      groupByLevelDescriptors.add(GroupByLevelDescriptor.deserialize(byteBuffer));
+      groupByLevelDescriptors.add(CrossSeriesAggregationDescriptor.deserialize(byteBuffer));
       descriptorSize--;
     }
     byte isNull = ReadWriteIOUtils.readByte(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
new file mode 100644
index 0000000000..e0ef548508
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.commons.lang3.Validate;
+
+import javax.annotation.Nullable;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class GroupByTagNode extends MultiChildNode {
+
+  private final List<String> tagKeys;
+  private final Map<List<String>, List<CrossSeriesAggregationDescriptor>>
+      tagValuesToAggregationDescriptors;
+  private final List<String> outputColumnNames;
+
+  // The parameter of `group by time`.
+  // Its value will be null if there is no `group by time` clause.
+  @Nullable protected GroupByTimeParameter groupByTimeParameter;
+
+  protected Ordering scanOrder;
+
+  public GroupByTagNode(
+      PlanNodeId id,
+      List<PlanNode> children,
+      @Nullable GroupByTimeParameter groupByTimeParameter,
+      Ordering scanOrder,
+      List<String> tagKeys,
+      Map<List<String>, List<CrossSeriesAggregationDescriptor>> tagValuesToAggregationDescriptors,
+      List<String> outputColumnNames) {
+    super(id, children);
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.scanOrder = Validate.notNull(scanOrder);
+    this.tagKeys = Validate.notNull(tagKeys);
+    this.tagValuesToAggregationDescriptors = Validate.notNull(tagValuesToAggregationDescriptors);
+    this.outputColumnNames = Validate.notNull(outputColumnNames);
+  }
+
+  public GroupByTagNode(
+      PlanNodeId id,
+      @Nullable GroupByTimeParameter groupByTimeParameter,
+      Ordering scanOrder,
+      List<String> tagKeys,
+      Map<List<String>, List<CrossSeriesAggregationDescriptor>> tagValuesToAggregationDescriptors,
+      List<String> outputColumnNames) {
+    super(id);
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.scanOrder = Validate.notNull(scanOrder);
+    this.tagKeys = Validate.notNull(tagKeys);
+    this.tagValuesToAggregationDescriptors = Validate.notNull(tagValuesToAggregationDescriptors);
+    this.outputColumnNames = Validate.notNull(outputColumnNames);
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return children;
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    this.children.add(child);
+  }
+
+  @Override
+  public PlanNode clone() {
+    // TODO: better do deep copy
+    return new GroupByTagNode(
+        getPlanNodeId(),
+        this.groupByTimeParameter,
+        this.scanOrder,
+        this.tagKeys,
+        this.tagValuesToAggregationDescriptors,
+        this.outputColumnNames);
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return CHILD_COUNT_NO_LIMIT;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    List<String> ret = new ArrayList<>(tagKeys);
+    ret.addAll(outputColumnNames);
+    return ret;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitGroupByTag(this, context);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    // Plan type.
+    PlanNodeType.GROUP_BY_TAG.serialize(byteBuffer);
+
+    // Tag keys.
+    ReadWriteIOUtils.writeStringList(tagKeys, byteBuffer);
+
+    // Tag values to aggregation descriptors.
+    ReadWriteIOUtils.write(tagValuesToAggregationDescriptors.size(), byteBuffer);
+    for (Entry<List<String>, List<CrossSeriesAggregationDescriptor>> entry :
+        tagValuesToAggregationDescriptors.entrySet()) {
+      ReadWriteIOUtils.writeStringList(entry.getKey(), byteBuffer);
+      ReadWriteIOUtils.write(entry.getValue().size(), byteBuffer);
+      for (CrossSeriesAggregationDescriptor aggregationDescriptor : entry.getValue()) {
+        if (aggregationDescriptor == null) {
+          ReadWriteIOUtils.write((byte) 0, byteBuffer);
+        } else {
+          ReadWriteIOUtils.write((byte) 1, byteBuffer);
+          aggregationDescriptor.serialize(byteBuffer);
+        }
+      }
+    }
+
+    // Output column names.
+    ReadWriteIOUtils.writeStringList(outputColumnNames, byteBuffer);
+
+    // Group by time parameter.
+    if (groupByTimeParameter == null) {
+      ReadWriteIOUtils.write((byte) 0, byteBuffer);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, byteBuffer);
+      groupByTimeParameter.serialize(byteBuffer);
+    }
+
+    // Scan order.
+    ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    // Plan type.
+    PlanNodeType.GROUP_BY_TAG.serialize(stream);
+
+    // Tag keys.
+    ReadWriteIOUtils.writeStringList(tagKeys, stream);
+
+    // Tag values to aggregation descriptors.
+    ReadWriteIOUtils.write(tagValuesToAggregationDescriptors.size(), stream);
+    for (Entry<List<String>, List<CrossSeriesAggregationDescriptor>> entry :
+        tagValuesToAggregationDescriptors.entrySet()) {
+      ReadWriteIOUtils.writeStringList(entry.getKey(), stream);
+      ReadWriteIOUtils.write(entry.getValue().size(), stream);
+      for (CrossSeriesAggregationDescriptor aggregationDescriptor : entry.getValue()) {
+        if (aggregationDescriptor == null) {
+          ReadWriteIOUtils.write((byte) 0, stream);
+        } else {
+          ReadWriteIOUtils.write((byte) 1, stream);
+          aggregationDescriptor.serialize(stream);
+        }
+      }
+    }
+
+    // Output column names.
+    ReadWriteIOUtils.writeStringList(outputColumnNames, stream);
+
+    // Group by time parameter.
+    if (groupByTimeParameter == null) {
+      ReadWriteIOUtils.write((byte) 0, stream);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, stream);
+      groupByTimeParameter.serialize(stream);
+    }
+
+    // Scan order.
+    ReadWriteIOUtils.write(scanOrder.ordinal(), stream);
+  }
+
+  @Nullable
+  public GroupByTimeParameter getGroupByTimeParameter() {
+    return groupByTimeParameter;
+  }
+
+  public Ordering getScanOrder() {
+    return scanOrder;
+  }
+
+  public List<String> getTagKeys() {
+    return tagKeys;
+  }
+
+  public Map<List<String>, List<CrossSeriesAggregationDescriptor>>
+      getTagValuesToAggregationDescriptors() {
+    return tagValuesToAggregationDescriptors;
+  }
+
+  public static GroupByTagNode deserialize(ByteBuffer byteBuffer) {
+    // Tag keys.
+    List<String> tagKeys = ReadWriteIOUtils.readStringList(byteBuffer);
+
+    // Tag values to aggregation descriptors.
+    int numOfEntries = ReadWriteIOUtils.readInt(byteBuffer);
+    Map<List<String>, List<CrossSeriesAggregationDescriptor>> tagValuesToAggregationDescriptors =
+        new HashMap<>();
+    while (numOfEntries > 0) {
+      List<String> tagValues = ReadWriteIOUtils.readStringList(byteBuffer);
+      List<CrossSeriesAggregationDescriptor> aggregationDescriptors = new ArrayList<>();
+      int numOfAggregationDescriptors = ReadWriteIOUtils.readInt(byteBuffer);
+      while (numOfAggregationDescriptors > 0) {
+        byte isNotNull = ReadWriteIOUtils.readByte(byteBuffer);
+        if (isNotNull == 1) {
+          aggregationDescriptors.add(CrossSeriesAggregationDescriptor.deserialize(byteBuffer));
+        }
+        numOfAggregationDescriptors -= 1;
+      }
+      tagValuesToAggregationDescriptors.put(tagValues, aggregationDescriptors);
+      numOfEntries -= 1;
+    }
+
+    // Output column names.
+    List<String> outputColumnNames = ReadWriteIOUtils.readStringList(byteBuffer);
+
+    // Group by time parameter.
+    byte hasGroupByTimeParameter = ReadWriteIOUtils.readByte(byteBuffer);
+    GroupByTimeParameter groupByTimeParameter = null;
+    if (hasGroupByTimeParameter == 1) {
+      groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
+    }
+
+    // Scan order.
+    Ordering scanOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new GroupByTagNode(
+        planNodeId,
+        groupByTimeParameter,
+        scanOrder,
+        tagKeys,
+        tagValuesToAggregationDescriptors,
+        outputColumnNames);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    GroupByTagNode that = (GroupByTagNode) o;
+    return Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
+        && scanOrder == that.scanOrder
+        && Objects.equals(tagKeys, that.tagKeys)
+        && Objects.equals(tagValuesToAggregationDescriptors, that.tagValuesToAggregationDescriptors)
+        && Objects.equals(outputColumnNames, that.outputColumnNames);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        super.hashCode(),
+        groupByTimeParameter,
+        scanOrder,
+        tagKeys,
+        tagValuesToAggregationDescriptors,
+        outputColumnNames);
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "GroupByTagNode-%s: Output: %s, Input: %s",
+        getPlanNodeId(),
+        getOutputColumnNames(),
+        tagValuesToAggregationDescriptors.values().stream()
+            .flatMap(
+                list -> list.stream().map(CrossSeriesAggregationDescriptor::getInputExpressions))
+            .collect(Collectors.toList()));
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
index daf7955522..464ee03130 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -137,10 +137,10 @@ public class AggregationDescriptor {
           outputAggregationNames.add(AggregationType.MAX_TIME.name().toLowerCase());
           break;
         default:
-          outputAggregationNames.add(aggregationFuncName);
+          outputAggregationNames.add(aggregationFuncName.toLowerCase());
       }
     } else {
-      outputAggregationNames.add(aggregationFuncName);
+      outputAggregationNames.add(aggregationFuncName.toLowerCase());
     }
     return outputAggregationNames;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByLevelDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java
similarity index 85%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByLevelDescriptor.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java
index 5fd7162a13..d40df1f33e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByLevelDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/CrossSeriesAggregationDescriptor.java
@@ -28,11 +28,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class GroupByLevelDescriptor extends AggregationDescriptor {
+public class CrossSeriesAggregationDescriptor extends AggregationDescriptor {
 
   private final Expression outputExpression;
 
-  public GroupByLevelDescriptor(
+  public CrossSeriesAggregationDescriptor(
       String aggregationFuncName,
       AggregationStep step,
       List<Expression> inputExpressions,
@@ -41,7 +41,7 @@ public class GroupByLevelDescriptor extends AggregationDescriptor {
     this.outputExpression = outputExpression;
   }
 
-  public GroupByLevelDescriptor(
+  public CrossSeriesAggregationDescriptor(
       AggregationDescriptor aggregationDescriptor, Expression outputExpression) {
     super(aggregationDescriptor);
     this.outputExpression = outputExpression;
@@ -66,8 +66,8 @@ public class GroupByLevelDescriptor extends AggregationDescriptor {
     return inputColumnNameToExpressionMap;
   }
 
-  public GroupByLevelDescriptor deepClone() {
-    return new GroupByLevelDescriptor(
+  public CrossSeriesAggregationDescriptor deepClone() {
+    return new CrossSeriesAggregationDescriptor(
         this.getAggregationFuncName(),
         this.getStep(),
         this.getInputExpressions(),
@@ -86,10 +86,10 @@ public class GroupByLevelDescriptor extends AggregationDescriptor {
     Expression.serialize(outputExpression, stream);
   }
 
-  public static GroupByLevelDescriptor deserialize(ByteBuffer byteBuffer) {
+  public static CrossSeriesAggregationDescriptor deserialize(ByteBuffer byteBuffer) {
     AggregationDescriptor aggregationDescriptor = AggregationDescriptor.deserialize(byteBuffer);
     Expression outputExpression = Expression.deserialize(byteBuffer);
-    return new GroupByLevelDescriptor(aggregationDescriptor, outputExpression);
+    return new CrossSeriesAggregationDescriptor(aggregationDescriptor, outputExpression);
   }
 
   @Override
@@ -103,7 +103,7 @@ public class GroupByLevelDescriptor extends AggregationDescriptor {
     if (!super.equals(o)) {
       return false;
     }
-    GroupByLevelDescriptor that = (GroupByLevelDescriptor) o;
+    CrossSeriesAggregationDescriptor that = (CrossSeriesAggregationDescriptor) o;
     return Objects.equals(outputExpression, that.outputExpression);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByTagComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByTagComponent.java
new file mode 100644
index 0000000000..32855caec8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByTagComponent.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.component;
+
+import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
+
+import org.apache.commons.lang3.Validate;
+
+import java.util.List;
+
+public class GroupByTagComponent extends StatementNode {
+  private final List<String> tagKeys;
+
+  public GroupByTagComponent(List<String> tagKeys) {
+    this.tagKeys = Validate.notNull(tagKeys);
+  }
+
+  public List<String> getTagKeys() {
+    return tagKeys;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
index 96f80eed10..a4011d0ed4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.FromComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByLevelComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTagComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.HavingCondition;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderByComponent;
@@ -94,6 +95,9 @@ public class QueryStatement extends Statement {
   // `GROUP BY LEVEL` clause
   protected GroupByLevelComponent groupByLevelComponent;
 
+  // `GROUP BY TAG` clause
+  protected GroupByTagComponent groupByTagComponent;
+
   public QueryStatement() {
     this.statementType = StatementType.QUERY;
   }
@@ -222,6 +226,14 @@ public class QueryStatement extends Statement {
     this.groupByLevelComponent = groupByLevelComponent;
   }
 
+  public GroupByTagComponent getGroupByTagComponent() {
+    return groupByTagComponent;
+  }
+
+  public void setGroupByTagComponent(GroupByTagComponent groupByTagComponent) {
+    this.groupByTagComponent = groupByTagComponent;
+  }
+
   public boolean isLastQuery() {
     return selectComponent.isHasLast();
   }
@@ -234,6 +246,10 @@ public class QueryStatement extends Statement {
     return groupByLevelComponent != null;
   }
 
+  public boolean isGroupByTag() {
+    return groupByTagComponent != null;
+  }
+
   public boolean isGroupByTime() {
     return groupByTimeComponent != null;
   }
@@ -280,6 +296,12 @@ public class QueryStatement extends Statement {
       if (isGroupByLevel() && isAlignByDevice()) {
         throw new SemanticException("group by level does not support align by device now.");
       }
+      if (isGroupByTag() && isAlignByDevice()) {
+        throw new SemanticException("group by tag does not support align by device now.");
+      }
+      if (isGroupByTag() && isGroupByLevel()) {
+        throw new SemanticException("group by level cannot be used togather with group by tag");
+      }
       for (ResultColumn resultColumn : selectComponent.getResultColumns()) {
         if (resultColumn.getColumnType() != ResultColumn.ColumnType.AGGREGATION) {
           throw new SemanticException("Raw data and aggregation hybrid query is not supported.");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/SchemaFetchStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/SchemaFetchStatement.java
index 56c5ea7390..f988aa3d70 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/SchemaFetchStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/internal/SchemaFetchStatement.java
@@ -33,11 +33,14 @@ public class SchemaFetchStatement extends Statement {
 
   private final PathPatternTree patternTree;
   private final Map<Integer, Template> templateMap;
+  private final boolean withTags;
 
-  public SchemaFetchStatement(PathPatternTree patternTree, Map<Integer, Template> templateMap) {
+  public SchemaFetchStatement(
+      PathPatternTree patternTree, Map<Integer, Template> templateMap, boolean withTags) {
     super();
     this.patternTree = patternTree;
     this.templateMap = templateMap;
+    this.withTags = withTags;
     setType(StatementType.FETCH_SCHEMA);
   }
 
@@ -58,4 +61,8 @@ public class SchemaFetchStatement extends Statement {
   public List<PartialPath> getPaths() {
     return patternTree.getAllPathPatterns();
   }
+
+  public boolean isWithTags() {
+    return withTags;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
index 4ace0fd496..68fd6ed86e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
@@ -366,7 +366,8 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
     try {
       for (PartialPath originalPath : originalPaths) {
         List<MeasurementPath> all =
-            IoTDB.schemaProcessor.getMeasurementPathsWithAlias(originalPath, 0, 0, isPrefixMatch)
+            IoTDB.schemaProcessor.getMeasurementPathsWithAlias(
+                    originalPath, 0, 0, isPrefixMatch, false)
                 .left;
         if (all.isEmpty()) {
           throw new LogicalOptimizeException(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/utils/WildcardsRemover.java b/server/src/main/java/org/apache/iotdb/db/qp/utils/WildcardsRemover.java
index 731566502c..243979d911 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/utils/WildcardsRemover.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/utils/WildcardsRemover.java
@@ -72,7 +72,7 @@ public class WildcardsRemover {
     try {
       Pair<List<MeasurementPath>, Integer> pair =
           IoTDB.schemaProcessor.getMeasurementPathsWithAlias(
-              path, currentLimit, currentOffset, isPrefixMatch);
+              path, currentLimit, currentOffset, isPrefixMatch, false);
       consumed += pair.right;
       currentOffset -= Math.min(currentOffset, pair.right);
       currentLimit -= pair.left.size();
@@ -123,7 +123,11 @@ public class WildcardsRemover {
     return remainingExpressions;
   }
 
-  /** @return should break the loop or not */
+  /**
+   * Check whether the path number is over limit.
+   *
+   * @return should break the loop or not
+   */
   public boolean checkIfPathNumberIsOverLimit(List<ResultColumn> resultColumns)
       throws PathNumOverLimitException {
     if (resultColumns.size()
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
index 2ed740eb2e..a581cdd867 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
@@ -33,15 +33,19 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
 
 public class DataNodeSchemaCacheTest {
   DataNodeSchemaCache dataNodeSchemaCache;
+  private Map<String, String> s1TagMap;
 
   @Before
   public void setUp() throws Exception {
     dataNodeSchemaCache = DataNodeSchemaCache.getInstance();
+    s1TagMap = new HashMap<>();
+    s1TagMap.put("k1", "v1");
   }
 
   @After
@@ -67,16 +71,21 @@ public class DataNodeSchemaCacheTest {
                     o ->
                         new SchemaCacheEntry(
                             (MeasurementSchema) o.getMeasurementSchema(),
+                            o.getTagMap(),
                             o.isUnderAlignedEntity())));
     Assert.assertEquals(
         TSDataType.INT32,
         schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s1")).getTsDataType());
+    Assert.assertEquals(
+        s1TagMap, schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s1")).getTagMap());
     Assert.assertEquals(
         TSDataType.FLOAT,
         schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s2")).getTsDataType());
+    Assert.assertNull(schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s2")).getTagMap());
     Assert.assertEquals(
         TSDataType.BOOLEAN,
         schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s3")).getTsDataType());
+    Assert.assertNull(schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s3")).getTagMap());
     Assert.assertEquals(3, dataNodeSchemaCache.estimatedSize());
 
     String[] otherMeasurements = new String[3];
@@ -94,16 +103,20 @@ public class DataNodeSchemaCacheTest {
                     o ->
                         new SchemaCacheEntry(
                             (MeasurementSchema) o.getMeasurementSchema(),
+                            o.getTagMap(),
                             o.isUnderAlignedEntity())));
     Assert.assertEquals(
         TSDataType.BOOLEAN,
         schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s3")).getTsDataType());
+    Assert.assertNull(schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s3")).getTagMap());
     Assert.assertEquals(
         TSDataType.TEXT,
         schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s4")).getTsDataType());
+    Assert.assertNull(schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s4")).getTagMap());
     Assert.assertEquals(
         TSDataType.INT64,
         schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s5")).getTsDataType());
+    Assert.assertNull(schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s4")).getTagMap());
     Assert.assertEquals(5, dataNodeSchemaCache.estimatedSize());
   }
 
@@ -176,21 +189,25 @@ public class DataNodeSchemaCacheTest {
 
   private ISchemaTree generateSchemaTree1() throws IllegalPathException {
     ClusterSchemaTree schemaTree = new ClusterSchemaTree();
-
+    Map<String, String> s1TagMap = new HashMap<>();
+    s1TagMap.put("k1", "v1");
     schemaTree.appendSingleMeasurement(
         new PartialPath("root.sg1.d1.s1"),
         new MeasurementSchema("s1", TSDataType.INT32),
+        s1TagMap,
         null,
         false);
     schemaTree.appendSingleMeasurement(
         new PartialPath("root.sg1.d1.s2"),
         new MeasurementSchema("s2", TSDataType.FLOAT),
         null,
+        null,
         false);
     schemaTree.appendSingleMeasurement(
         new PartialPath("root.sg1.d1.s3"),
         new MeasurementSchema("s3", TSDataType.BOOLEAN),
         null,
+        null,
         false);
 
     return schemaTree;
@@ -203,16 +220,19 @@ public class DataNodeSchemaCacheTest {
         new PartialPath("root.sg1.d1.s3"),
         new MeasurementSchema("s3", TSDataType.BOOLEAN),
         null,
+        null,
         false);
     schemaTree.appendSingleMeasurement(
         new PartialPath("root.sg1.d1.s4"),
         new MeasurementSchema("s4", TSDataType.TEXT),
         null,
+        null,
         false);
     schemaTree.appendSingleMeasurement(
         new PartialPath("root.sg1.d1.s5"),
         new MeasurementSchema("s5", TSDataType.INT64),
         null,
+        null,
         false);
 
     return schemaTree;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGTest.java
index 82072edf31..f420429414 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGTest.java
@@ -92,9 +92,11 @@ public abstract class MTreeBelowSGTest {
       IMTreeBelowSG mtree;
       if (SchemaEngineMode.valueOf(IoTDBDescriptor.getInstance().getConfig().getSchemaEngineMode())
           .equals(SchemaEngineMode.Schema_File)) {
-        mtree = new MTreeBelowSGCachedImpl(root.getStorageGroupNodeByStorageGroupPath(path), 0);
+        mtree =
+            new MTreeBelowSGCachedImpl(root.getStorageGroupNodeByStorageGroupPath(path), null, 0);
       } else {
-        mtree = new MTreeBelowSGMemoryImpl(root.getStorageGroupNodeByStorageGroupPath(path), 0);
+        mtree =
+            new MTreeBelowSGMemoryImpl(root.getStorageGroupNodeByStorageGroupPath(path), null, 0);
       }
       usedMTree.add(mtree);
       return mtree;
@@ -283,7 +285,8 @@ public abstract class MTreeBelowSGTest {
       assertEquals("root.a.d1.s0", result.get(1).getFullPath());
 
       List<MeasurementPath> result2 =
-          storageGroup.getMeasurementPathsWithAlias(new PartialPath("root.a.*.s0"), 0, 0, false)
+          storageGroup.getMeasurementPathsWithAlias(
+                  new PartialPath("root.a.*.s0"), 0, 0, false, false)
               .left;
       result2.sort(Comparator.comparing(MeasurementPath::getFullPath));
       assertEquals(2, result2.size());
@@ -294,7 +297,7 @@ public abstract class MTreeBelowSGTest {
 
       result2 =
           storageGroup.getMeasurementPathsWithAlias(
-                  new PartialPath("root.a.*.temperature"), 0, 0, false)
+                  new PartialPath("root.a.*.temperature"), 0, 0, false, false)
               .left;
       result2.sort(Comparator.comparing(MeasurementPath::getFullPath));
       assertEquals(2, result2.size());
@@ -302,7 +305,8 @@ public abstract class MTreeBelowSGTest {
       assertEquals("root.a.d1.temperature", result2.get(1).getFullPathWithAlias());
 
       Pair<List<MeasurementPath>, Integer> result3 =
-          storageGroup.getMeasurementPathsWithAlias(new PartialPath("root.a.**"), 2, 0, false);
+          storageGroup.getMeasurementPathsWithAlias(
+              new PartialPath("root.a.**"), 2, 0, false, false);
       assertEquals(2, result3.left.size());
       assertEquals(2, result3.right.intValue());
 
@@ -516,7 +520,7 @@ public abstract class MTreeBelowSGTest {
     assertEquals(
         2,
         storageGroup
-            .getMeasurementPathsWithAlias(new PartialPath("root.**"), 0, 0, false)
+            .getMeasurementPathsWithAlias(new PartialPath("root.**"), 0, 0, false, false)
             .left
             .size());
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
index d4cb371614..01e5f1c3d7 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
@@ -16,18 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.db.metadata.path;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
 public class MeasurementPathTest {
 
   @Test
@@ -43,4 +50,70 @@ public class MeasurementPathTest {
     Assert.assertEquals(rawPath.getMeasurementSchema(), newPath.getMeasurementSchema());
     Assert.assertEquals(rawPath.isUnderAlignedEntity(), newPath.isUnderAlignedEntity());
   }
+
+  @Test
+  public void testMeasurementPathSerde() throws IllegalPathException, IOException {
+    IMeasurementSchema schema = new MeasurementSchema("s1", TSDataType.TEXT);
+    MeasurementPath expectedPath =
+        new MeasurementPath(new PartialPath("root.sg.d1.s1"), schema, true);
+    expectedPath.setMeasurementAlias("alias_s1");
+    MeasurementPath actualPath = serdeWithByteBuffer(expectedPath);
+    assertMeasurementPathEquals(actualPath, expectedPath);
+    actualPath = serdeWithStream(expectedPath);
+    assertMeasurementPathEquals(actualPath, expectedPath);
+
+    HashMap<String, String> tagMap = new HashMap<>();
+    tagMap.put("k1", "v1");
+    expectedPath.setTagMap(tagMap);
+    actualPath = serdeWithByteBuffer(expectedPath);
+    assertMeasurementPathEquals(actualPath, expectedPath);
+    actualPath = serdeWithStream(expectedPath);
+    assertMeasurementPathEquals(actualPath, expectedPath);
+  }
+
+  @Test
+  public void testCloneAndCopy() throws IllegalPathException {
+    IMeasurementSchema schema = new MeasurementSchema("s1", TSDataType.TEXT);
+    MeasurementPath expectedPath =
+        new MeasurementPath(new PartialPath("root.sg.d1.s1"), schema, true);
+    expectedPath.setMeasurementAlias("alias_s1");
+
+    MeasurementPath actualPath = expectedPath.clone();
+    assertMeasurementPathEquals(actualPath, expectedPath);
+    actualPath = (MeasurementPath) expectedPath.copy();
+    assertMeasurementPathEquals(actualPath, expectedPath);
+
+    HashMap<String, String> tagMap = new HashMap<>();
+    tagMap.put("k1", "v1");
+    expectedPath.setTagMap(tagMap);
+    actualPath = expectedPath.clone();
+    assertMeasurementPathEquals(actualPath, expectedPath);
+    actualPath = (MeasurementPath) expectedPath.copy();
+    assertMeasurementPathEquals(actualPath, expectedPath);
+  }
+
+  private MeasurementPath serdeWithByteBuffer(MeasurementPath origin) {
+    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+    origin.serialize(byteBuffer);
+    byteBuffer.flip();
+    Assert.assertEquals((byte) 0, ReadWriteIOUtils.readByte(byteBuffer));
+    return MeasurementPath.deserialize(byteBuffer);
+  }
+
+  private MeasurementPath serdeWithStream(MeasurementPath origin) throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+    origin.serialize(dataOutputStream);
+    ByteBuffer byteBuffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+    Assert.assertEquals((byte) 0, ReadWriteIOUtils.readByte(byteBuffer));
+    return MeasurementPath.deserialize(byteBuffer);
+  }
+
+  private void assertMeasurementPathEquals(MeasurementPath actual, MeasurementPath expected) {
+    Assert.assertEquals(expected, actual);
+    Assert.assertEquals(expected.isUnderAlignedEntity(), actual.isUnderAlignedEntity());
+    Assert.assertEquals(expected.getTagMap(), actual.getTagMap());
+    Assert.assertEquals(expected.getMeasurementSchema(), actual.getMeasurementSchema());
+    Assert.assertEquals(expected.getMeasurementAlias(), actual.getMeasurementAlias());
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index 6dc335605d..50490364f7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -932,7 +932,12 @@ public class OperatorMemoryTest {
 
       SchemaFetchScanOperator operator =
           new SchemaFetchScanOperator(
-              planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), null, null, null);
+              planNodeId,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              null,
+              false);
 
       assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
       assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperatorTest.java
index ca2dd9cff2..6872558b18 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperatorTest.java
@@ -74,7 +74,8 @@ public class SchemaFetchScanOperatorTest {
     patternTree.constructTree();
 
     SchemaFetchScanOperator schemaFetchScanOperator =
-        new SchemaFetchScanOperator(null, null, patternTree, Collections.emptyMap(), schemaRegion);
+        new SchemaFetchScanOperator(
+            null, null, patternTree, Collections.emptyMap(), schemaRegion, false);
 
     Assert.assertTrue(schemaFetchScanOperator.hasNext());
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AggregationDescriptorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AggregationDescriptorTest.java
index 9d9c6d325d..25d131e50b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AggregationDescriptorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AggregationDescriptorTest.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -44,7 +44,8 @@ import java.util.stream.Collectors;
 public class AggregationDescriptorTest {
 
   private static final List<AggregationDescriptor> aggregationDescriptorList = new ArrayList<>();
-  private static final List<GroupByLevelDescriptor> groupByLevelDescriptorList = new ArrayList<>();
+  private static final List<CrossSeriesAggregationDescriptor> groupByLevelDescriptorList =
+      new ArrayList<>();
 
   public static final Map<String, PartialPath> pathMap = new HashMap<>();
 
@@ -91,7 +92,7 @@ public class AggregationDescriptorTest {
             Collections.singletonList(new TimeSeriesOperand(pathMap.get("root.sg.d1.s1")))));
 
     groupByLevelDescriptorList.add(
-        new GroupByLevelDescriptor(
+        new CrossSeriesAggregationDescriptor(
             AggregationType.COUNT.name().toLowerCase(),
             AggregationStep.FINAL,
             Arrays.asList(
@@ -99,7 +100,7 @@ public class AggregationDescriptorTest {
                 new TimeSeriesOperand(pathMap.get("root.sg.d1.s1"))),
             new TimeSeriesOperand(pathMap.get("root.sg.*.s1"))));
     groupByLevelDescriptorList.add(
-        new GroupByLevelDescriptor(
+        new CrossSeriesAggregationDescriptor(
             AggregationType.AVG.name().toLowerCase(),
             AggregationStep.FINAL,
             Arrays.asList(
@@ -107,7 +108,7 @@ public class AggregationDescriptorTest {
                 new TimeSeriesOperand(pathMap.get("root.sg.d2.s1"))),
             new TimeSeriesOperand(pathMap.get("root.sg.*.s1"))));
     groupByLevelDescriptorList.add(
-        new GroupByLevelDescriptor(
+        new CrossSeriesAggregationDescriptor(
             AggregationType.COUNT.name().toLowerCase(),
             AggregationStep.INTERMEDIATE,
             Arrays.asList(
@@ -115,7 +116,7 @@ public class AggregationDescriptorTest {
                 new TimeSeriesOperand(pathMap.get("root.sg.d1.s1"))),
             new TimeSeriesOperand(pathMap.get("root.sg.*.s1"))));
     groupByLevelDescriptorList.add(
-        new GroupByLevelDescriptor(
+        new CrossSeriesAggregationDescriptor(
             AggregationType.AVG.name().toLowerCase(),
             AggregationStep.INTERMEDIATE,
             Arrays.asList(
@@ -168,7 +169,7 @@ public class AggregationDescriptorTest {
     Assert.assertEquals(
         expectedOutputColumnNames,
         groupByLevelDescriptorList.stream()
-            .map(GroupByLevelDescriptor::getOutputColumnNames)
+            .map(CrossSeriesAggregationDescriptor::getOutputColumnNames)
             .flatMap(List::stream)
             .distinct()
             .collect(Collectors.toList()));
@@ -193,7 +194,7 @@ public class AggregationDescriptorTest {
     Assert.assertEquals(
         expectedInputColumnNames,
         groupByLevelDescriptorList.stream()
-            .map(GroupByLevelDescriptor::getInputColumnNamesList)
+            .map(CrossSeriesAggregationDescriptor::getInputColumnNamesList)
             .collect(Collectors.toList()));
   }
 
@@ -237,7 +238,7 @@ public class AggregationDescriptorTest {
     Assert.assertEquals(
         expectedMapList,
         groupByLevelDescriptorList.stream()
-            .map(GroupByLevelDescriptor::getInputColumnCandidateMap)
+            .map(CrossSeriesAggregationDescriptor::getInputColumnCandidateMap)
             .collect(Collectors.toList()));
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
index 895c4701d6..02d4643fe5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
@@ -611,7 +611,7 @@ public class AnalyzeTest {
     assertEquals(
         expectedAnalysis.getAggregationExpressions(), actualAnalysis.getAggregationExpressions());
     assertEquals(
-        expectedAnalysis.getGroupByLevelExpressions(), actualAnalysis.getGroupByLevelExpressions());
+        expectedAnalysis.getCrossGroupByExpressions(), actualAnalysis.getCrossGroupByExpressions());
   }
 
   private void alignByDeviceAnalysisEqualTest(Analysis actualAnalysis, Analysis expectedAnalysis) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
similarity index 100%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 79208ae4f6..1a121354ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -49,6 +49,11 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
     return schemaTree;
   }
 
+  @Override
+  public ISchemaTree fetchSchemaWithTags(PathPatternTree patternTree) {
+    return fetchSchema(patternTree);
+  }
+
   @Override
   public ISchemaTree fetchSchemaWithAutoCreate(
       PartialPath devicePath,
@@ -72,12 +77,16 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
 
     SchemaMeasurementNode s1 =
         new SchemaMeasurementNode("s1", new MeasurementSchema("s1", TSDataType.INT32));
+    s1.setTagMap(Collections.singletonMap("key1", "value1"));
     SchemaMeasurementNode s2 =
         new SchemaMeasurementNode("s2", new MeasurementSchema("s2", TSDataType.DOUBLE));
+    s2.setTagMap(Collections.singletonMap("key1", "value1"));
     SchemaMeasurementNode s3 =
         new SchemaMeasurementNode("s3", new MeasurementSchema("s3", TSDataType.BOOLEAN));
+    s3.setTagMap(Collections.singletonMap("key1", "value2"));
     SchemaMeasurementNode s4 =
         new SchemaMeasurementNode("s4", new MeasurementSchema("s4", TSDataType.TEXT));
+    s4.setTagMap(Collections.singletonMap("key2", "value1"));
     s2.setAlias("status");
 
     SchemaEntityNode d1 = new SchemaEntityNode("d1");
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/parser/StatementGeneratorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/parser/StatementGeneratorTest.java
index 32e992ef1d..1113033cfc 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/parser/StatementGeneratorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/parser/StatementGeneratorTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.plan.parser;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 
@@ -41,6 +42,21 @@ public class StatementGeneratorTest {
         "SELECT s1, s2 FROM root.sg1.d1 LIMIT 10 OFFSET 10", selectExprList, prefixPaths, 10, 10);
   }
 
+  @Test
+  public void testGroupByTagWithDuplicatedKeys() {
+    try {
+      checkQueryStatement(
+          "SELECT avg(*) FROM root.sg.** GROUP BY TAGS(k1, k2, k1)",
+          Collections.emptyList(),
+          Collections.emptyList(),
+          10,
+          10);
+      Assert.fail();
+    } catch (SemanticException e) {
+      Assert.assertEquals("duplicated key in GROUP BY TAGS: k1", e.getMessage());
+    }
+  }
+
   // TODO: add more tests
 
   private void checkQueryStatement(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
index 1646afe43f..fe2b9500a0 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.FakePartitionFetcherImpl;
 import org.apache.iotdb.db.mpp.plan.analyze.FakeSchemaFetcherImpl;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.plan.plan.node.PlanNodeDeserializeHelper;
 import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
@@ -44,11 +46,18 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSe
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.AlterTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -60,7 +69,10 @@ import org.junit.Test;
 import java.nio.ByteBuffer;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.iotdb.db.mpp.plan.plan.QueryLogicalPlanUtil.querySQLs;
@@ -73,8 +85,13 @@ public class LogicalPlannerTest {
   @Test
   public void testQueryPlan() {
     for (String sql : querySQLs) {
-      Assert.assertEquals(sqlToPlanMap.get(sql), parseSQLToPlanNode(sql));
-      System.out.printf("\"%s\" TEST PASSED\n", sql);
+      try {
+        Assert.assertEquals(sqlToPlanMap.get(sql), parseSQLToPlanNode(sql));
+      } catch (Exception e) {
+        System.err.println("Failed to generated logical plan for " + sql);
+        e.printStackTrace();
+        break;
+      }
     }
   }
 
@@ -608,21 +625,71 @@ public class LogicalPlannerTest {
     }
   }
 
-  private PlanNode parseSQLToPlanNode(String sql) {
-    PlanNode planNode = null;
+  @Test
+  public void testGroupByTag() {
+    String sql = "select max_value(s1) from root.** group by tags(key1)";
     try {
-      Statement statement =
-          StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
-      MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
-      Analyzer analyzer =
-          new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl());
-      Analysis analysis = analyzer.analyze(statement);
-      LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
-      planNode = planner.plan(analysis).getRootNode();
+      PlanNode pn = parseSQLToPlanNode(sql);
+      GroupByTagNode root = (GroupByTagNode) pn;
+
+      Assert.assertEquals(Collections.singletonList("key1"), root.getTagKeys());
+
+      Map<List<String>, List<CrossSeriesAggregationDescriptor>> tagValuesToAggregationDescriptors =
+          root.getTagValuesToAggregationDescriptors();
+      Assert.assertEquals(1, tagValuesToAggregationDescriptors.size());
+      Assert.assertEquals(
+          Collections.singleton(Collections.singletonList("value1")),
+          tagValuesToAggregationDescriptors.keySet());
+      List<CrossSeriesAggregationDescriptor> descriptors =
+          tagValuesToAggregationDescriptors.get(Collections.singletonList("value1"));
+      Assert.assertEquals(1, descriptors.size());
+      CrossSeriesAggregationDescriptor descriptor = descriptors.get(0);
+      Assert.assertEquals("s1", descriptor.getOutputExpression().toString());
+      Assert.assertEquals(AggregationType.MAX_VALUE, descriptor.getAggregationType());
+      Assert.assertEquals(AggregationStep.FINAL, descriptor.getStep());
+      Assert.assertEquals(3, descriptor.getInputExpressions().size());
+      for (Expression expression : descriptor.getInputExpressions()) {
+        Assert.assertTrue(expression instanceof TimeSeriesOperand);
+        Assert.assertEquals("s1", ((TimeSeriesOperand) expression).getPath().getMeasurement());
+      }
+
+      Assert.assertEquals(Arrays.asList("key1", "max_value(s1)"), root.getOutputColumnNames());
+
+      Assert.assertNull(root.getGroupByTimeParameter());
+
+      Assert.assertEquals(Ordering.ASC, root.getScanOrder());
+
+      Assert.assertEquals(3, root.getChildren().size());
+      for (PlanNode child : root.getChildren()) {
+        Assert.assertTrue(
+            child instanceof AlignedSeriesAggregationScanNode
+                || child instanceof SeriesAggregationScanNode);
+      }
     } catch (Exception e) {
       e.printStackTrace();
       fail();
     }
-    return planNode;
+  }
+
+  @Test
+  public void testGroupByTagWithValueFilter() {
+    String sql = "select max_value(s1) from root.** where s1>1 group by tags(key1)";
+    try {
+      parseSQLToPlanNode(sql);
+      fail();
+    } catch (Exception e) {
+      Assert.assertTrue(
+          e.getMessage().contains("Only time filters are supported in GROUP BY TAGS query"));
+    }
+  }
+
+  private PlanNode parseSQLToPlanNode(String sql) {
+    Statement statement = StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
+    MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
+    Analyzer analyzer =
+        new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl());
+    Analysis analysis = analyzer.analyze(statement);
+    LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+    return planner.plan(analysis).getRootNode();
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index 43b12eab4e..b1eed72479 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -47,7 +47,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSc
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
@@ -597,40 +597,40 @@ public class QueryLogicalPlanUtil {
             queryId.genPlanNodeId(),
             sourceNodeList,
             Arrays.asList(
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.COUNT.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Arrays.asList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))),
                     new TimeSeriesOperand(schemaMap.get("root.sg.*.s1"))),
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.COUNT.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1"))),
                     new TimeSeriesOperand(schemaMap.get("root.sg.*.*.s1"))),
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.MAX_VALUE.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Arrays.asList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))),
                     new TimeSeriesOperand(schemaMap.get("root.sg.*.s2"))),
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.MAX_VALUE.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s2"))),
                     new TimeSeriesOperand(schemaMap.get("root.sg.*.*.s2"))),
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.LAST_VALUE.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Arrays.asList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))),
                     new TimeSeriesOperand(schemaMap.get("root.sg.*.s1"))),
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.LAST_VALUE.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Collections.singletonList(
@@ -848,21 +848,21 @@ public class QueryLogicalPlanUtil {
             queryId.genPlanNodeId(),
             Collections.singletonList(aggregationNode),
             Arrays.asList(
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.COUNT.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Arrays.asList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))),
                     new TimeSeriesOperand(schemaMap.get("root.sg.*.s1"))),
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.MAX_VALUE.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Arrays.asList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))),
                     new TimeSeriesOperand(schemaMap.get("root.sg.*.s2"))),
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.LAST_VALUE.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Arrays.asList(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index bacf89dcae..3368d7408f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -46,7 +46,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -219,7 +219,7 @@ public class AggregationDistributionTest {
                 genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT),
                 genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT)),
             Collections.singletonList(
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.COUNT.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Arrays.asList(
@@ -257,7 +257,7 @@ public class AggregationDistributionTest {
                 genAggregationSourceNode(queryId, d3s1Path, AggregationType.COUNT),
                 genAggregationSourceNode(queryId, d4s1Path, AggregationType.COUNT)),
             Collections.singletonList(
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.COUNT.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Arrays.asList(
@@ -319,7 +319,7 @@ public class AggregationDistributionTest {
             new PlanNodeId("TestGroupByLevelNode"),
             Collections.singletonList(slidingWindowAggregationNode),
             Collections.singletonList(
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.COUNT.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Arrays.asList(
@@ -396,12 +396,12 @@ public class AggregationDistributionTest {
                 genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT),
                 genAggregationSourceNode(queryId, d1s2Path, AggregationType.COUNT)),
             Arrays.asList(
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.COUNT.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s1Path))),
                     new TimeSeriesOperand(new PartialPath(groupedPathS1))),
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.COUNT.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))),
@@ -456,14 +456,14 @@ public class AggregationDistributionTest {
                 genAggregationSourceNode(queryId, d1s2Path, AggregationType.COUNT),
                 genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT)),
             Arrays.asList(
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.COUNT.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Arrays.asList(
                         new TimeSeriesOperand(new PartialPath(d1s1Path)),
                         new TimeSeriesOperand(new PartialPath(d2s1Path))),
                     new TimeSeriesOperand(new PartialPath(groupedPathS1))),
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.COUNT.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))),
@@ -531,14 +531,14 @@ public class AggregationDistributionTest {
             new PlanNodeId("TestGroupByLevelNode"),
             Collections.singletonList(slidingWindowAggregationNode),
             Arrays.asList(
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.COUNT.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Arrays.asList(
                         new TimeSeriesOperand(new PartialPath(d1s1Path)),
                         new TimeSeriesOperand(new PartialPath(d2s1Path))),
                     new TimeSeriesOperand(new PartialPath(groupedPathS1))),
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.COUNT.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))),
@@ -637,9 +637,9 @@ public class AggregationDistributionTest {
 
   private void verifyGroupByLevelDescriptor(
       Map<String, List<String>> expected, GroupByLevelNode node) {
-    List<GroupByLevelDescriptor> descriptors = node.getGroupByLevelDescriptors();
+    List<CrossSeriesAggregationDescriptor> descriptors = node.getGroupByLevelDescriptors();
     assertEquals(expected.size(), descriptors.size());
-    for (GroupByLevelDescriptor descriptor : descriptors) {
+    for (CrossSeriesAggregationDescriptor descriptor : descriptors) {
       String outputExpression = descriptor.getOutputExpression().getExpressionString();
       assertEquals(expected.get(outputExpression).size(), descriptor.getInputExpressions().size());
       for (Expression inputExpression : descriptor.getInputExpressions()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/metadata/read/SchemaFetchScanNodeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/metadata/read/SchemaFetchScanNodeTest.java
index ad6859d0bf..e7a7ad656c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/metadata/read/SchemaFetchScanNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/metadata/read/SchemaFetchScanNodeTest.java
@@ -40,7 +40,11 @@ public class SchemaFetchScanNodeTest {
     patternTree.appendPathPattern(new PartialPath("root.sg.**.*"));
     SchemaFetchScanNode schemaFetchScanNode =
         new SchemaFetchScanNode(
-            new PlanNodeId("0"), new PartialPath("root.sg"), patternTree, Collections.emptyMap());
+            new PlanNodeId("0"),
+            new PartialPath("root.sg"),
+            patternTree,
+            Collections.emptyMap(),
+            true);
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 1024);
     schemaFetchScanNode.serialize(byteBuffer);
     byteBuffer.flip();
@@ -48,5 +52,6 @@ public class SchemaFetchScanNodeTest {
     Assert.assertEquals("root.sg", recoveredNode.getStorageGroup().getFullPath());
     Assert.assertEquals(
         "root.sg.**.*", recoveredNode.getPatternTree().getAllPathPatterns().get(0).getFullPath());
+    Assert.assertTrue(recoveredNode.isWithTags());
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java
index 499b15fac4..f94eec2ed5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -85,7 +85,7 @@ public class GroupByLevelNodeSerdeTest {
             new PlanNodeId("TestGroupByLevelNode"),
             Arrays.asList(seriesAggregationScanNode1, seriesAggregationScanNode2),
             Collections.singletonList(
-                new GroupByLevelDescriptor(
+                new CrossSeriesAggregationDescriptor(
                     AggregationType.MAX_TIME.name().toLowerCase(),
                     AggregationStep.FINAL,
                     Arrays.asList(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByTagNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByTagNodeSerdeTest.java
new file mode 100644
index 0000000000..feaaf4fcaa
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByTagNodeSerdeTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.plan.node.process;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
+import org.apache.iotdb.db.mpp.plan.plan.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class GroupByTagNodeSerdeTest {
+
+  @Test
+  public void testSerializeAndDeserialize() throws IllegalPathException, IOException {
+    GroupByTimeParameter groupByTimeParameter =
+        new GroupByTimeParameter(1, 100, 1, 1, true, true, true);
+    CrossSeriesAggregationDescriptor s1MaxTime =
+        new CrossSeriesAggregationDescriptor(
+            AggregationType.MAX_TIME.name().toLowerCase(),
+            AggregationStep.FINAL,
+            Collections.singletonList(new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"))),
+            new FunctionExpression(
+                "max_time",
+                new LinkedHashMap<>(),
+                Collections.singletonList(
+                    new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")))));
+    CrossSeriesAggregationDescriptor s1Avg =
+        new CrossSeriesAggregationDescriptor(
+            AggregationType.AVG.name().toLowerCase(),
+            AggregationStep.FINAL,
+            Collections.singletonList(new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"))),
+            new FunctionExpression(
+                "avg",
+                new LinkedHashMap<>(),
+                Collections.singletonList(
+                    new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")))));
+    AggregationDescriptor s1MaxTimePartial =
+        new AggregationDescriptor(
+            AggregationType.MAX_TIME.name().toLowerCase(),
+            AggregationStep.PARTIAL,
+            Collections.singletonList(new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"))));
+    AggregationDescriptor s1AvgTimePartial =
+        new AggregationDescriptor(
+            AggregationType.AVG.name().toLowerCase(),
+            AggregationStep.PARTIAL,
+            Collections.singletonList(new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"))));
+    Map<List<String>, List<CrossSeriesAggregationDescriptor>> tagValuesToAggregationDescriptors =
+        new HashMap<>();
+    tagValuesToAggregationDescriptors.put(
+        Arrays.asList("v1", "v2"), Arrays.asList(s1MaxTime, s1Avg));
+    GroupByTagNode expectedNode =
+        new GroupByTagNode(
+            new PlanNodeId("TestGroupByTagNode"),
+            Collections.singletonList(
+                new SeriesAggregationScanNode(
+                    new PlanNodeId("TestSeriesAggregateScanNode1"),
+                    new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+                    Arrays.asList(s1MaxTimePartial, s1AvgTimePartial),
+                    Ordering.ASC,
+                    null,
+                    groupByTimeParameter,
+                    null)),
+            groupByTimeParameter,
+            Ordering.ASC,
+            Collections.singletonList("k1"),
+            tagValuesToAggregationDescriptors,
+            Arrays.asList("MAX_TIME(root.sg.d1.s1)", "AVG(root.sg.d1.s1)"));
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
+    expectedNode.serialize(byteBuffer);
+    byteBuffer.flip();
+    Assert.assertEquals(expectedNode, PlanNodeDeserializeHelper.deserialize(byteBuffer));
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dataOutputStream = new DataOutputStream(baos);
+    expectedNode.serialize(dataOutputStream);
+    byte[] byteArray = baos.toByteArray();
+    ByteBuffer buffer = ByteBuffer.wrap(byteArray);
+    Assert.assertEquals(expectedNode, PlanNodeDeserializeHelper.deserialize(buffer));
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 5959ba0d42..c8d96f2a19 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -844,7 +844,7 @@ public class ReadWriteIOUtils {
     int size = list.size();
     buffer.putInt(size);
     for (String s : list) {
-      buffer.put(s.getBytes());
+      write(s, buffer);
     }
   }