You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/10 12:57:55 UTC

[iotdb] branch master updated: [IOTDB-2658] Generate logical plan for query statement —— UT & Raw Data Query & Aggregation Query (#5469)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cc78c84f99 [IOTDB-2658] Generate logical plan for query statement  —— UT & Raw Data Query & Aggregation Query (#5469)
cc78c84f99 is described below

commit cc78c84f99aadcf5aa59dcbdba862171f267173b
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Sun Apr 10 20:57:51 2022 +0800

    [IOTDB-2658] Generate logical plan for query statement  —— UT & Raw Data Query & Aggregation Query (#5469)
---
 .../exception/sql/StatementAnalyzeException.java   |   6 +
 .../db/mpp/common/filter/BasicFunctionFilter.java  |   8 +-
 .../iotdb/db/mpp/common/filter/InFilter.java       |   6 +-
 .../iotdb/db/mpp/common/filter/LikeFilter.java     |   8 +-
 .../iotdb/db/mpp/common/filter/QueryFilter.java    |  19 +-
 .../iotdb/db/mpp/common/filter/RegexpFilter.java   |   8 +-
 .../iotdb/db/mpp/common/schematree/SchemaTree.java |   6 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analysis.java  |  21 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  |  59 ++-
 .../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java  |  51 +-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |   7 +-
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   | 236 +++------
 .../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java | 277 ++++++++++
 .../plan/IOutputPlanNode.java}                     |  17 +-
 .../db/mpp/sql/planner/plan/node/ColumnHeader.java |  88 ++++
 .../db/mpp/sql/planner/plan/node/PlanNode.java     |   2 -
 .../plan/node/metedata/read/ShowDevicesNode.java   |   5 -
 .../node/metedata/write/AlterTimeSeriesNode.java   |   5 -
 .../plan/node/metedata/write/AuthorNode.java       |   5 -
 .../write/CreateAlignedTimeSeriesNode.java         |   5 -
 .../node/metedata/write/CreateTimeSeriesNode.java  |   5 -
 .../planner/plan/node/process/AggregateNode.java   |  93 +++-
 .../planner/plan/node/process/DeviceMergeNode.java |  79 ++-
 .../planner/plan/node/process/ExchangeNode.java    |   5 -
 .../sql/planner/plan/node/process/FillNode.java    |  17 +-
 .../sql/planner/plan/node/process/FilterNode.java  |  59 ++-
 .../planner/plan/node/process/FilterNullNode.java  |  63 ++-
 .../plan/node/process/GroupByLevelNode.java        |  91 ++--
 .../sql/planner/plan/node/process/LimitNode.java   |  41 +-
 .../sql/planner/plan/node/process/OffsetNode.java  |  37 +-
 .../sql/planner/plan/node/process/SortNode.java    |  17 +-
 .../planner/plan/node/process/TimeJoinNode.java    |  78 ++-
 .../planner/plan/node/sink/FragmentSinkNode.java   |   5 -
 .../plan/node/source/SeriesAggregateScanNode.java  | 133 +++--
 .../planner/plan/node/source/SeriesScanNode.java   |  78 ++-
 .../sql/planner/plan/node/source/SourceNode.java   |  22 -
 .../plan/node/write/InsertMultiTabletsNode.java    |   5 -
 .../sql/planner/plan/node/write/InsertRowNode.java |   5 -
 .../planner/plan/node/write/InsertRowsNode.java    |   5 -
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |   5 -
 .../planner/plan/node/write/InsertTabletNode.java  |   5 -
 .../sql/rewriter/ColumnPaginationController.java   |  46 +-
 .../db/mpp/sql/rewriter/WildcardsRemover.java      |  24 +-
 .../db/mpp/sql/statement/StatementVisitor.java     |  28 +
 .../statement/component/GroupByLevelComponent.java |   5 +
 .../component/GroupByLevelController.java          |  16 +
 .../sql/statement/component/SelectComponent.java   |  16 +-
 .../statement/crud/AggregationQueryStatement.java  |  29 +-
 .../mpp/sql/statement/crud/FillQueryStatement.java |   7 +-
 .../statement/crud/GroupByFillQueryStatement.java  |   5 +
 .../sql/statement/crud/GroupByQueryStatement.java  |   5 +
 .../mpp/sql/statement/crud/LastQueryStatement.java |   7 +-
 .../db/mpp/sql/statement/crud/QueryStatement.java  |  41 +-
 .../mpp/sql/statement/crud/UDAFQueryStatement.java |   7 +-
 .../mpp/sql/statement/crud/UDTFQueryStatement.java |   6 +
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |   4 +
 .../iotdb/db/query/expression/Expression.java      |   4 -
 .../query/expression/binary/BinaryExpression.java  |   7 -
 .../db/query/expression/unary/ConstantOperand.java |   7 -
 .../query/expression/unary/FunctionExpression.java |  11 -
 .../query/expression/unary/LogicNotExpression.java |   7 -
 .../query/expression/unary/NegationExpression.java |   7 -
 .../query/expression/unary/TimeSeriesOperand.java  |   8 -
 .../org/apache/iotdb/db/utils/SchemaUtils.java     |   2 +-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 101 ++--
 .../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java  | 103 +---
 .../db/mpp/sql/plan/QueryLogicalPlanUtil.java      | 567 +++++++++++++++++++++
 .../read/expression/impl/BinaryExpression.java     |  39 ++
 .../read/expression/impl/GlobalTimeExpression.java |  20 +
 .../expression/impl/SingleSeriesExpression.java    |  20 +
 70 files changed, 2143 insertions(+), 693 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/exception/sql/StatementAnalyzeException.java b/server/src/main/java/org/apache/iotdb/db/exception/sql/StatementAnalyzeException.java
index efecd6de7e..4d4cea20cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/sql/StatementAnalyzeException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/sql/StatementAnalyzeException.java
@@ -36,4 +36,10 @@ public class StatementAnalyzeException extends IoTDBException {
             filterOperator, filterType, FilterConstant.filterNames.get(filterType)),
         TSStatusCode.LOGICAL_OPTIMIZE_ERROR.getStatusCode());
   }
+
+  public StatementAnalyzeException(String type, String message) {
+    super(
+        String.format("Unsupported type: [%s]. %s", type, message),
+        TSStatusCode.LOGICAL_OPTIMIZE_ERROR.getStatusCode());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/BasicFunctionFilter.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/BasicFunctionFilter.java
index 46bb10ddfe..aec114cffb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/BasicFunctionFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/BasicFunctionFilter.java
@@ -19,8 +19,8 @@
 package org.apache.iotdb.db.mpp.common.filter;
 
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.LogicalOperatorException;
 import org.apache.iotdb.db.exception.sql.SQLParserException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.constant.FilterConstant;
 import org.apache.iotdb.db.mpp.sql.constant.FilterConstant.FilterType;
@@ -75,7 +75,7 @@ public class BasicFunctionFilter extends FunctionFilter {
   @Override
   protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
       Map<PartialPath, TSDataType> pathTSDataTypeHashMap)
-      throws LogicalOperatorException, MetadataException {
+      throws StatementAnalyzeException, MetadataException {
     TSDataType type = pathTSDataTypeHashMap.get(singlePath);
     if (type == null) {
       throw new MetadataException(
@@ -109,12 +109,12 @@ public class BasicFunctionFilter extends FunctionFilter {
                       ? new Binary(value.substring(1, value.length() - 1))
                       : new Binary(value));
         } else {
-          throw new LogicalOperatorException(
+          throw new StatementAnalyzeException(
               "For Basic operator,TEXT type only support EQUAL or NOTEQUAL operator");
         }
         break;
       default:
-        throw new LogicalOperatorException(type.toString(), "");
+        throw new StatementAnalyzeException(type.toString(), "");
     }
 
     return new Pair<>(ret, singlePath.getFullPath());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/InFilter.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/InFilter.java
index 4d31e28d3f..dbc3b818aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/InFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/InFilter.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.common.filter;
 
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.LogicalOperatorException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.constant.FilterConstant.FilterType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -75,7 +75,7 @@ public class InFilter extends FunctionFilter {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
       Map<PartialPath, TSDataType> pathTSDataTypeHashMap)
-      throws LogicalOperatorException, MetadataException {
+      throws StatementAnalyzeException, MetadataException {
     TSDataType type = pathTSDataTypeHashMap.get(singlePath);
     if (type == null) {
       throw new MetadataException(
@@ -131,7 +131,7 @@ public class InFilter extends FunctionFilter {
         ret = In.getUnaryExpression(singlePath, binaryValues, not);
         break;
       default:
-        throw new LogicalOperatorException(type.toString(), "");
+        throw new StatementAnalyzeException(type.toString(), "");
     }
 
     return new Pair<>(ret, singlePath.getFullPath());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/LikeFilter.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/LikeFilter.java
index bc8186bafd..659f744316 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/LikeFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/LikeFilter.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.common.filter;
 
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.LogicalOperatorException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.constant.FilterConstant.FilterType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -51,7 +51,7 @@ public class LikeFilter extends FunctionFilter {
   @Override
   protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
       Map<PartialPath, TSDataType> pathTSDataTypeHashMap)
-      throws LogicalOperatorException, MetadataException {
+      throws StatementAnalyzeException, MetadataException {
     TSDataType type = pathTSDataTypeHashMap.get(singlePath);
     if (type == null) {
       throw new MetadataException(
@@ -59,9 +59,9 @@ public class LikeFilter extends FunctionFilter {
     }
     IUnaryExpression ret;
     if (type != TEXT) {
-      throw new LogicalOperatorException(type.toString(), "Only TEXT is supported in 'Like'");
+      throw new StatementAnalyzeException(type.toString(), "Only TEXT is supported in 'Like'");
     } else if (value.startsWith("\"") && value.endsWith("\"")) {
-      throw new LogicalOperatorException(value, "Please use single quotation marks");
+      throw new StatementAnalyzeException(value, "Please use single quotation marks");
     } else {
       ret =
           Like.getUnaryExpression(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/QueryFilter.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/QueryFilter.java
index e5e4b2b293..0b74f3d52d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/QueryFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/QueryFilter.java
@@ -19,8 +19,7 @@
 package org.apache.iotdb.db.mpp.common.filter;
 
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.LogicalOperatorException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.constant.FilterConstant;
 import org.apache.iotdb.db.mpp.sql.constant.FilterConstant.FilterType;
@@ -124,18 +123,18 @@ public class QueryFilter implements Comparable<QueryFilter> {
    * @param pathTSDataTypeHashMap
    */
   public IExpression transformToExpression(Map<PartialPath, TSDataType> pathTSDataTypeHashMap)
-      throws QueryProcessException {
+      throws StatementAnalyzeException {
     if (isSingle) {
       Pair<IUnaryExpression, String> ret;
       try {
         ret = transformToSingleQueryFilter(pathTSDataTypeHashMap);
       } catch (MetadataException e) {
-        throw new QueryProcessException(e);
+        throw new StatementAnalyzeException("Meet error when transformToSingleQueryFilter");
       }
       return ret.left;
     } else {
       if (childOperators.isEmpty()) {
-        throw new LogicalOperatorException(
+        throw new StatementAnalyzeException(
             String.valueOf(filterType), "this filter is not leaf, but it's empty");
       }
       IExpression retFilter = childOperators.get(0).transformToExpression(pathTSDataTypeHashMap);
@@ -150,7 +149,7 @@ public class QueryFilter implements Comparable<QueryFilter> {
             retFilter = BinaryExpression.or(retFilter, currentFilter);
             break;
           default:
-            throw new LogicalOperatorException(
+            throw new StatementAnalyzeException(
                 String.valueOf(filterType), "Maybe it means " + getFilterName());
         }
       }
@@ -168,9 +167,9 @@ public class QueryFilter implements Comparable<QueryFilter> {
    */
   protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
       Map<PartialPath, TSDataType> pathTSDataTypeHashMap)
-      throws LogicalOperatorException, MetadataException {
+      throws StatementAnalyzeException, MetadataException {
     if (childOperators.isEmpty()) {
-      throw new LogicalOperatorException(
+      throw new StatementAnalyzeException(
           String.valueOf(filterType),
           "TransformToSingleFilter: this filter is not a leaf, but it's empty.");
     }
@@ -183,7 +182,7 @@ public class QueryFilter implements Comparable<QueryFilter> {
     for (int i = 1; i < childOperators.size(); i++) {
       currentPair = childOperators.get(i).transformToSingleQueryFilter(pathTSDataTypeHashMap);
       if (!path.equals(currentPair.right)) {
-        throw new LogicalOperatorException(
+        throw new StatementAnalyzeException(
             "TransformToSingleFilter: paths among children are not inconsistent: one is: "
                 + path
                 + ", another is: "
@@ -199,7 +198,7 @@ public class QueryFilter implements Comparable<QueryFilter> {
               FilterFactory.or(retFilter.getFilter(), currentPair.left.getFilter()));
           break;
         default:
-          throw new LogicalOperatorException(
+          throw new StatementAnalyzeException(
               String.valueOf(filterType), "Maybe it means " + getFilterName());
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/RegexpFilter.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/RegexpFilter.java
index 6001eed41e..5417a12d39 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/RegexpFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/filter/RegexpFilter.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.common.filter;
 
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.LogicalOperatorException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.constant.FilterConstant.FilterType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -51,7 +51,7 @@ public class RegexpFilter extends FunctionFilter {
   @Override
   protected Pair<IUnaryExpression, String> transformToSingleQueryFilter(
       Map<PartialPath, TSDataType> pathTSDataTypeHashMap)
-      throws LogicalOperatorException, MetadataException {
+      throws StatementAnalyzeException, MetadataException {
     TSDataType type = pathTSDataTypeHashMap.get(singlePath);
     if (type == null) {
       throw new MetadataException(
@@ -59,9 +59,9 @@ public class RegexpFilter extends FunctionFilter {
     }
     IUnaryExpression ret;
     if (type != TEXT) {
-      throw new LogicalOperatorException(type.toString(), "Only TEXT is supported in 'Regexp'");
+      throw new StatementAnalyzeException(type.toString(), "Only TEXT is supported in 'Regexp'");
     } else if (value.startsWith("\"") && value.endsWith("\"")) {
-      throw new LogicalOperatorException(value, "Please use single quotation marks");
+      throw new StatementAnalyzeException(value, "Please use single quotation marks");
     } else {
       ret =
           Regexp.getUnaryExpression(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
index 5543d3ab5f..240a6c1875 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
@@ -32,7 +32,11 @@ import java.util.List;
 
 public class SchemaTree {
 
-  private SchemaNode root;
+  private final SchemaNode root;
+
+  public SchemaTree(SchemaNode root) {
+    this.root = root;
+  }
 
   /**
    * Return all measurement paths for given path pattern and filter the result by slimit and offset.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
index 84f49b66ba..3a7b961bf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
@@ -25,11 +25,10 @@ import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 /** Analysis used for planning a query. TODO: This class may need to store more info for a query. */
 public class Analysis {
@@ -51,7 +50,7 @@ public class Analysis {
 
   private SchemaTree schemaTree;
 
-  private Map<String, Set<PartialPath>> deviceIdToPathsMap;
+  private IExpression queryFilter;
 
   public List<RegionReplicaSet> getPartitionInfo(PartialPath seriesPath, Filter timefilter) {
     // TODO: (xingtanzjr) implement the calculation of timePartitionIdList
@@ -82,14 +81,6 @@ public class Analysis {
     this.schemaPartition = schemaPartition;
   }
 
-  public Map<String, Set<PartialPath>> getDeviceIdToPathsMap() {
-    return deviceIdToPathsMap;
-  }
-
-  public void setDeviceIdToPathsMap(Map<String, Set<PartialPath>> deviceIdToPathsMap) {
-    this.deviceIdToPathsMap = deviceIdToPathsMap;
-  }
-
   public SchemaTree getSchemaTree() {
     return schemaTree;
   }
@@ -97,4 +88,12 @@ public class Analysis {
   public void setSchemaTree(SchemaTree schemaTree) {
     this.schemaTree = schemaTree;
   }
+
+  public IExpression getQueryFilter() {
+    return queryFilter;
+  }
+
+  public void setQueryFilter(IExpression expression) {
+    this.queryFilter = expression;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index 4376baaa23..c0aaacb62e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -40,14 +40,26 @@ import org.apache.iotdb.db.mpp.sql.rewriter.RemoveNotOptimizer;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+import org.apache.iotdb.db.mpp.sql.statement.component.ResultColumn;
 import org.apache.iotdb.db.mpp.sql.statement.component.WhereCondition;
 import org.apache.iotdb.db.mpp.sql.statement.crud.*;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.sys.AuthorStatement;
-
-import java.util.*;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /** Analyze the statement and generate Analysis. */
 public class Analyzer {
@@ -93,16 +105,27 @@ public class Analyzer {
         SchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
 
         // bind metadata, remove wildcards, and apply SLIMIT & SOFFSET
-        Map<String, Set<PartialPath>> deviceIdToPathsMap = new HashMap<>();
         rewrittenStatement =
-            (QueryStatement)
-                new WildcardsRemover().rewrite(rewrittenStatement, schemaTree, deviceIdToPathsMap);
+            (QueryStatement) new WildcardsRemover().rewrite(rewrittenStatement, schemaTree);
 
         // fetch partition information
+        Set<String> devicePathSet = new HashSet<>();
+        for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) {
+          devicePathSet.addAll(
+              resultColumn.collectPaths().stream()
+                  .map(PartialPath::getDevice)
+                  .collect(Collectors.toList()));
+        }
+        if (queryStatement.getWhereCondition() != null) {
+          devicePathSet.addAll(
+              queryStatement.getWhereCondition().getQueryFilter().getPathSet().stream()
+                  .map(PartialPath::getDevice)
+                  .collect(Collectors.toList()));
+        }
         List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
-        for (String deviceId : deviceIdToPathsMap.keySet()) {
+        for (String devicePath : devicePathSet) {
           DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
-          dataPartitionQueryParam.setDevicePath(deviceId);
+          dataPartitionQueryParam.setDevicePath(devicePath);
           dataPartitionQueryParams.add(dataPartitionQueryParam);
         }
         DataPartition dataPartition =
@@ -115,13 +138,29 @@ public class Analyzer {
           filter = new RemoveNotOptimizer().optimize(filter);
           filter = new DnfFilterOptimizer().optimize(filter);
           filter = new MergeSingleFilterOptimizer().optimize(filter);
-          whereCondition.setQueryFilter(filter);
+
+          // transform QueryFilter to expression
+          List<PartialPath> filterPaths = new ArrayList<>(filter.getPathSet());
+          HashMap<PartialPath, TSDataType> pathTSDataTypeHashMap = new HashMap<>();
+          for (PartialPath filterPath : filterPaths) {
+            pathTSDataTypeHashMap.put(
+                filterPath,
+                SQLConstant.isReservedPath(filterPath)
+                    ? TSDataType.INT64
+                    : filterPath.getSeriesType());
+          }
+          IExpression expression = filter.transformToExpression(pathTSDataTypeHashMap);
+          expression =
+              ExpressionOptimizer.getInstance()
+                  .optimize(expression, queryStatement.getSelectComponent().getDeduplicatedPaths());
+          analysis.setQueryFilter(expression);
         }
         analysis.setStatement(rewrittenStatement);
         analysis.setSchemaTree(schemaTree);
-        analysis.setDeviceIdToPathsMap(deviceIdToPathsMap);
         analysis.setDataPartitionInfo(dataPartition);
-      } catch (StatementAnalyzeException | PathNumOverLimitException e) {
+      } catch (StatementAnalyzeException
+          | PathNumOverLimitException
+          | QueryFilterOptimizationException e) {
         e.printStackTrace();
       }
       return analysis;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
index ed23e6e4d8..6e5ed07282 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
@@ -20,22 +20,65 @@
 package org.apache.iotdb.db.mpp.sql.analyze;
 
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.db.mpp.common.schematree.*;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.List;
 
 public class FakeSchemaFetcherImpl implements ISchemaFetcher {
+
+  private final SchemaTree schemaTree = new SchemaTree(generateSchemaTree());
+
   @Override
   public SchemaTree fetchSchema(PathPatternTree patternTree) {
-    return new SchemaTree();
+    return schemaTree;
   }
 
   @Override
   public SchemaTree fetchSchemaWithAutoCreate(
       PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes) {
-    return null;
+    return schemaTree;
+  }
+
+  /**
+   * Generate the following tree: root.sg.d1.s1, root.sg.d1.s2(status) root.sg.d2.s1,
+   * root.sg.d2.s2(status) root.sg.d2.a.s1, root.sg.d2.a.s2(status)
+   *
+   * @return the root node of the generated schemTree
+   */
+  private SchemaNode generateSchemaTree() {
+    SchemaNode root = new SchemaInternalNode("root");
+
+    SchemaNode sg = new SchemaInternalNode("sg");
+    root.addChild("sg", sg);
+
+    SchemaEntityNode d1 = new SchemaEntityNode("d1");
+    sg.addChild("d1", d1);
+
+    SchemaMeasurementNode s1 =
+        new SchemaMeasurementNode("s1", new MeasurementSchema("s1", TSDataType.INT32));
+    d1.addChild("s1", s1);
+    SchemaMeasurementNode s2 =
+        new SchemaMeasurementNode("s2", new MeasurementSchema("s1", TSDataType.INT32));
+    s2.setAlias("status");
+    d1.addChild("s2", s2);
+    d1.addAliasChild("status", s2);
+
+    SchemaEntityNode d2 = new SchemaEntityNode("d2");
+    sg.addChild("d2", d2);
+    d2.addChild("s1", s1);
+    d2.addChild("s2", s2);
+    d2.addAliasChild("status", s2);
+
+    SchemaEntityNode a = new SchemaEntityNode("a");
+    a.setAligned(true);
+    d2.addChild("a", a);
+    a.addChild("s1", s1);
+    a.addChild("s2", s2);
+    a.addAliasChild("status", s2);
+
+    return root;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 1dbc265c0c..cf40915784 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.mpp.buffer.DataBlockManager;
 import org.apache.iotdb.db.mpp.buffer.DataBlockService;
 import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
 import org.apache.iotdb.db.mpp.execution.DataDriver;
 import org.apache.iotdb.db.mpp.execution.DataDriverContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
@@ -56,6 +55,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanN
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.io.IOException;
@@ -168,7 +168,7 @@ public class LocalExecutionPlanner {
     public Operator visitFilter(FilterNode node, LocalExecutionPlanContext context) {
       PlanNode child = node.getChild();
 
-      QueryFilter filterExpression = node.getPredicate();
+      IExpression filterExpression = node.getPredicate();
       List<String> outputSymbols = node.getOutputColumnNames();
       return super.visitFilter(node, context);
     }
@@ -222,7 +222,8 @@ public class LocalExecutionPlanner {
               context.getNextOperatorId(),
               node.getPlanNodeId(),
               TimeJoinOperator.class.getSimpleName());
-      return new TimeJoinOperator(operatorContext, children, node.getMergeOrder(), node.getTypes());
+      return new TimeJoinOperator(
+          operatorContext, children, node.getMergeOrder(), node.getOutputColumnTypes());
     }
 
     @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index 2db6e48a2f..1b842beac9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -19,8 +19,8 @@
 package org.apache.iotdb.db.mpp.sql.planner;
 
 import org.apache.iotdb.db.auth.AuthException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
 import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
@@ -29,25 +29,12 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSer
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AuthorNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.GroupByLevelNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.SortNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
-import org.apache.iotdb.db.mpp.sql.statement.component.FillComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.GroupByLevelComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
-import org.apache.iotdb.db.mpp.sql.statement.component.ResultColumn;
+import org.apache.iotdb.db.mpp.sql.statement.component.*;
 import org.apache.iotdb.db.mpp.sql.statement.crud.*;
 import org.apache.iotdb.db.mpp.sql.statement.crud.AggregationQueryStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.FillQueryStatement;
@@ -58,17 +45,15 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.sys.AuthorStatement;
-import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.read.expression.ExpressionType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /** Generate a logical plan for the statement. */
 public class LogicalPlanner {
@@ -98,7 +83,7 @@ public class LogicalPlanner {
    * This visitor is used to generate a logical plan for the statement and returns the {@link
    * PlanNode}.
    */
-  private class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryContext> {
+  private static class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryContext> {
 
     private final Analysis analysis;
 
@@ -108,157 +93,83 @@ public class LogicalPlanner {
 
     @Override
     public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext context) {
-      PlanBuilder planBuilder = planSelectComponent(queryStatement);
-
-      if (queryStatement.getWhereCondition() != null) {
-        planBuilder =
-            planQueryFilter(planBuilder, queryStatement.getWhereCondition().getQueryFilter());
-      }
-
-      if (queryStatement.isGroupByLevel()) {
-        planBuilder =
-            planGroupByLevel(
-                planBuilder,
-                ((AggregationQueryStatement) queryStatement).getGroupByLevelComponent());
-      }
-
-      if (queryStatement instanceof FillQueryStatement) {
-        planBuilder =
-            planFill(planBuilder, ((FillQueryStatement) queryStatement).getFillComponent());
-      }
-
-      planBuilder = planFilterNull(planBuilder, queryStatement.getFilterNullComponent());
-      planBuilder = planSort(planBuilder, queryStatement.getResultOrder());
-      planBuilder = planLimit(planBuilder, queryStatement.getRowLimit());
-      planBuilder = planOffset(planBuilder, queryStatement.getRowOffset());
+      QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
+
+      planBuilder.planRawDataQuerySource(
+          queryStatement.getDeviceNameToPathsMap(),
+          queryStatement.getResultOrder(),
+          queryStatement.isAlignByDevice(),
+          analysis.getQueryFilter(),
+          queryStatement.getSelectedPathNames());
+
+      planBuilder.planFilterNull(queryStatement.getFilterNullComponent());
+      planBuilder.planLimit(queryStatement.getRowLimit());
+      planBuilder.planOffset(queryStatement.getRowOffset());
       return planBuilder.getRoot();
     }
 
-    private PlanBuilder planSelectComponent(QueryStatement queryStatement) {
-      // TODO: generate SourceNode for QueryFilter
-      Map<String, Set<SourceNode>> deviceNameToSourceNodesMap = new HashMap<>();
-
-      for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) {
-        Set<SourceNode> sourceNodes = planResultColumn(resultColumn);
-        for (SourceNode sourceNode : sourceNodes) {
-          String deviceName = sourceNode.getDeviceName();
-          deviceNameToSourceNodesMap
-              .computeIfAbsent(deviceName, k -> new HashSet<>())
-              .add(sourceNode);
-        }
-      }
-
-      if (queryStatement.isAlignByDevice()) {
-        DeviceMergeNode deviceMergeNode = new DeviceMergeNode(context.getQueryId().genPlanNodeId());
-        for (Map.Entry<String, Set<SourceNode>> entry : deviceNameToSourceNodesMap.entrySet()) {
-          String deviceName = entry.getKey();
-          List<PlanNode> planNodes = new ArrayList<>(entry.getValue());
-          if (planNodes.size() == 1) {
-            deviceMergeNode.addChildDeviceNode(deviceName, planNodes.get(0));
-          } else {
-            TimeJoinNode timeJoinNode =
-                new TimeJoinNode(
-                    context.getQueryId().genPlanNodeId(),
-                    queryStatement.getResultOrder(),
-                    null,
-                    planNodes);
-            deviceMergeNode.addChildDeviceNode(deviceName, timeJoinNode);
-          }
-        }
-        return new PlanBuilder(deviceMergeNode);
-      }
-
-      List<PlanNode> planNodes =
-          deviceNameToSourceNodesMap.entrySet().stream()
-              .flatMap(entry -> entry.getValue().stream())
-              .collect(Collectors.toList());
-      TimeJoinNode timeJoinNode =
-          new TimeJoinNode(
-              context.getQueryId().genPlanNodeId(),
-              queryStatement.getResultOrder(),
-              null,
-              planNodes);
-      return new PlanBuilder(timeJoinNode);
-    }
-
-    private Set<SourceNode> planResultColumn(ResultColumn resultColumn) {
-      Set<SourceNode> resultSourceNodeSet = new HashSet<>();
-      resultColumn
-          .getExpression()
-          .collectPlanNode(resultSourceNodeSet, context.getQueryId().genPlanNodeId());
-      return resultSourceNodeSet;
-    }
-
-    private PlanBuilder planQueryFilter(PlanBuilder planBuilder, QueryFilter queryFilter) {
-      if (queryFilter == null) {
-        return planBuilder;
+    @Override
+    public PlanNode visitAggregationQuery(
+        AggregationQueryStatement queryStatement, MPPQueryContext context) {
+      QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
+      Map<String, Map<PartialPath, Set<AggregationType>>> deviceNameToAggregationsMap;
+
+      if (analysis.getQueryFilter() != null
+          && analysis.getQueryFilter().getType() != ExpressionType.GLOBAL_TIME) {
+        // with value filter
+        planBuilder.planAggregationSourceWithValueFilter(
+            queryStatement.getDeviceNameToAggregationsMap(),
+            queryStatement.getDeviceNameToPathsMap(),
+            queryStatement.getResultOrder(),
+            queryStatement.isAlignByDevice(),
+            analysis.getQueryFilter(),
+            queryStatement.getSelectedPathNames());
+      } else {
+        // without value filter
+        planBuilder.planAggregationSourceWithoutValueFilter(
+            queryStatement.getDeviceNameToAggregationsMap(),
+            queryStatement.getResultOrder(),
+            queryStatement.isAlignByDevice(),
+            analysis.getQueryFilter());
       }
 
-      return planBuilder.withNewRoot(
-          new FilterNode(context.getQueryId().genPlanNodeId(), planBuilder.getRoot(), queryFilter));
+      planBuilder.planGroupByLevel(queryStatement.getGroupByLevelComponent());
+      planBuilder.planFilterNull(queryStatement.getFilterNullComponent());
+      planBuilder.planLimit(queryStatement.getRowLimit());
+      planBuilder.planOffset(queryStatement.getRowOffset());
+      return planBuilder.getRoot();
     }
 
-    private PlanBuilder planGroupByLevel(
-        PlanBuilder planBuilder, GroupByLevelComponent groupByLevelComponent) {
-      if (groupByLevelComponent == null) {
-        return planBuilder;
-      }
-
-      return planBuilder.withNewRoot(
-          new GroupByLevelNode(
-              context.getQueryId().genPlanNodeId(),
-              planBuilder.getRoot(),
-              groupByLevelComponent.getLevels(),
-              groupByLevelComponent.getGroupedPathMap()));
+    @Override
+    public PlanNode visitGroupByQuery(
+        GroupByQueryStatement queryStatement, MPPQueryContext context) {
+      throw new UnsupportedOperationException();
     }
 
-    private PlanBuilder planFill(PlanBuilder planBuilder, FillComponent fillComponent) {
-      // TODO: support Fill
-      return planBuilder;
+    @Override
+    public PlanNode visitGroupByFillQuery(
+        GroupByFillQueryStatement queryStatement, MPPQueryContext context) {
+      throw new UnsupportedOperationException();
     }
 
-    private PlanBuilder planFilterNull(
-        PlanBuilder planBuilder, FilterNullComponent filterNullComponent) {
-      if (filterNullComponent == null) {
-        return planBuilder;
-      }
-
-      return planBuilder.withNewRoot(
-          new FilterNullNode(
-              context.getQueryId().genPlanNodeId(),
-              planBuilder.getRoot(),
-              filterNullComponent.getWithoutPolicyType(),
-              filterNullComponent.getWithoutNullColumns().stream()
-                  .map(Expression::getExpressionString)
-                  .collect(Collectors.toList())));
+    @Override
+    public PlanNode visitFillQuery(FillQueryStatement queryStatement, MPPQueryContext context) {
+      throw new UnsupportedOperationException();
     }
 
-    private PlanBuilder planSort(PlanBuilder planBuilder, OrderBy resultOrder) {
-      if (resultOrder == null || resultOrder == OrderBy.TIMESTAMP_ASC) {
-        return planBuilder;
-      }
-
-      return planBuilder.withNewRoot(
-          new SortNode(
-              context.getQueryId().genPlanNodeId(), planBuilder.getRoot(), null, resultOrder));
+    @Override
+    public PlanNode visitLastQuery(LastQueryStatement queryStatement, MPPQueryContext context) {
+      throw new UnsupportedOperationException();
     }
 
-    private PlanBuilder planLimit(PlanBuilder planBuilder, int rowLimit) {
-      if (rowLimit == 0) {
-        return planBuilder;
-      }
-
-      return planBuilder.withNewRoot(
-          new LimitNode(context.getQueryId().genPlanNodeId(), rowLimit, planBuilder.getRoot()));
+    @Override
+    public PlanNode visitUDTFQuery(UDTFQueryStatement queryStatement, MPPQueryContext context) {
+      throw new UnsupportedOperationException();
     }
 
-    private PlanBuilder planOffset(PlanBuilder planBuilder, int rowOffset) {
-      if (rowOffset == 0) {
-        return planBuilder;
-      }
-
-      return planBuilder.withNewRoot(
-          new OffsetNode(context.getQueryId().genPlanNodeId(), planBuilder.getRoot(), rowOffset));
+    @Override
+    public PlanNode visitUDAFQuery(UDAFQueryStatement queryStatement, MPPQueryContext context) {
+      throw new UnsupportedOperationException();
     }
 
     @Override
@@ -557,21 +468,4 @@ public class LogicalPlanner {
       return insertRowsNode;
     }
   }
-
-  private class PlanBuilder {
-
-    private PlanNode root;
-
-    public PlanBuilder(PlanNode root) {
-      this.root = root;
-    }
-
-    public PlanNode getRoot() {
-      return root;
-    }
-
-    public PlanBuilder withNewRoot(PlanNode newRoot) {
-      return new PlanBuilder(newRoot);
-    }
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
new file mode 100644
index 0000000000..788998f87b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.planner;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.*;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
+import org.apache.iotdb.db.mpp.sql.statement.component.GroupByLevelComponent;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class QueryPlanBuilder {
+
+  private PlanNode root;
+
+  private final MPPQueryContext context;
+
+  public QueryPlanBuilder(MPPQueryContext context) {
+    this.context = context;
+  }
+
+  public PlanNode getRoot() {
+    return root;
+  }
+
+  public void planRawDataQuerySource(
+      Map<String, Set<PartialPath>> deviceNameToPathsMap,
+      OrderBy scanOrder,
+      boolean isAlignByDevice,
+      IExpression queryFilter,
+      List<String> selectedPathList) {
+    Map<String, List<PlanNode>> deviceNameToSourceNodesMap = new HashMap<>();
+
+    for (Map.Entry<String, Set<PartialPath>> entry : deviceNameToPathsMap.entrySet()) {
+      String deviceName = entry.getKey();
+      Set<String> allSensors =
+          entry.getValue().stream().map(PartialPath::getMeasurement).collect(Collectors.toSet());
+      for (PartialPath path : entry.getValue()) {
+        deviceNameToSourceNodesMap
+            .computeIfAbsent(deviceName, k -> new ArrayList<>())
+            .add(
+                new SeriesScanNode(
+                    context.getQueryId().genPlanNodeId(), path, allSensors, scanOrder));
+      }
+    }
+
+    if (isAlignByDevice) {
+      planDeviceMerge(deviceNameToSourceNodesMap, scanOrder, queryFilter, selectedPathList);
+    } else {
+      planTimeJoin(deviceNameToSourceNodesMap, scanOrder, queryFilter, selectedPathList);
+    }
+  }
+
+  public void planAggregationSourceWithoutValueFilter(
+      Map<String, Map<PartialPath, Set<AggregationType>>> deviceNameToAggregationsMap,
+      OrderBy scanOrder,
+      boolean isAlignByDevice,
+      IExpression queryFilter) {
+    Filter timeFilter = null;
+    if (queryFilter != null) {
+      timeFilter = ((GlobalTimeExpression) queryFilter).getFilter();
+    }
+
+    Map<String, List<PlanNode>> deviceNameToSourceNodesMap = new HashMap<>();
+    for (Map.Entry<String, Map<PartialPath, Set<AggregationType>>> entry :
+        deviceNameToAggregationsMap.entrySet()) {
+      String deviceName = entry.getKey();
+
+      for (PartialPath path : entry.getValue().keySet()) {
+        deviceNameToSourceNodesMap
+            .computeIfAbsent(deviceName, k -> new ArrayList<>())
+            .add(
+                new SeriesAggregateScanNode(
+                    context.getQueryId().genPlanNodeId(),
+                    path,
+                    new ArrayList<>(entry.getValue().get(path)),
+                    scanOrder,
+                    timeFilter,
+                    null));
+      }
+    }
+
+    if (isAlignByDevice) {
+      planDeviceMerge(deviceNameToSourceNodesMap, scanOrder, null, null);
+    } else {
+      planTimeJoin(deviceNameToSourceNodesMap, scanOrder, null, null);
+    }
+  }
+
+  public void planAggregationSourceWithValueFilter(
+      Map<String, Map<PartialPath, Set<AggregationType>>> deviceNameToAggregationsMap,
+      Map<String, Set<PartialPath>> deviceNameToPathsMap,
+      OrderBy scanOrder,
+      boolean isAlignByDevice,
+      IExpression queryFilter,
+      List<String> selectedPathList) {
+    Map<String, List<PlanNode>> deviceNameToSourceNodesMap = new HashMap<>();
+
+    for (Map.Entry<String, Set<PartialPath>> entry : deviceNameToPathsMap.entrySet()) {
+      String deviceName = entry.getKey();
+      Set<String> allSensors =
+          entry.getValue().stream().map(PartialPath::getMeasurement).collect(Collectors.toSet());
+      for (PartialPath path : entry.getValue()) {
+        deviceNameToSourceNodesMap
+            .computeIfAbsent(deviceName, k -> new ArrayList<>())
+            .add(
+                new SeriesScanNode(
+                    context.getQueryId().genPlanNodeId(), path, allSensors, scanOrder));
+      }
+    }
+
+    if (isAlignByDevice) {
+      planDeviceMergeForAggregation(
+          deviceNameToAggregationsMap,
+          deviceNameToSourceNodesMap,
+          scanOrder,
+          queryFilter,
+          selectedPathList);
+    } else {
+      planTimeJoin(deviceNameToSourceNodesMap, scanOrder, queryFilter, selectedPathList);
+      Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
+      deviceNameToAggregationsMap.values().forEach(aggregateFuncMap::putAll);
+      this.root =
+          new AggregateNode(
+              context.getQueryId().genPlanNodeId(), this.getRoot(), aggregateFuncMap, null);
+    }
+  }
+
+  public void planTimeJoin(
+      Map<String, List<PlanNode>> deviceNameToSourceNodesMap,
+      OrderBy mergeOrder,
+      IExpression queryFilter,
+      List<String> selectedPathList) {
+    List<PlanNode> sourceNodes =
+        deviceNameToSourceNodesMap.entrySet().stream()
+            .flatMap(entry -> entry.getValue().stream())
+            .collect(Collectors.toList());
+    this.root = convergeWithTimeJoin(sourceNodes, mergeOrder, queryFilter, selectedPathList);
+  }
+
+  public void planDeviceMerge(
+      Map<String, List<PlanNode>> deviceNameToSourceNodesMap,
+      OrderBy mergeOrder,
+      IExpression queryFilter,
+      List<String> selectedPathList) {
+    DeviceMergeNode deviceMergeNode =
+        new DeviceMergeNode(context.getQueryId().genPlanNodeId(), mergeOrder);
+    for (Map.Entry<String, List<PlanNode>> entry : deviceNameToSourceNodesMap.entrySet()) {
+      String deviceName = entry.getKey();
+      List<PlanNode> planNodes = new ArrayList<>(entry.getValue());
+      deviceMergeNode.addChildDeviceNode(
+          deviceName, convergeWithTimeJoin(planNodes, mergeOrder, queryFilter, selectedPathList));
+    }
+    this.root = deviceMergeNode;
+  }
+
+  private void planDeviceMergeForAggregation(
+      Map<String, Map<PartialPath, Set<AggregationType>>> deviceNameToAggregationsMap,
+      Map<String, List<PlanNode>> deviceNameToSourceNodesMap,
+      OrderBy mergeOrder,
+      IExpression queryFilter,
+      List<String> selectedPathList) {
+    DeviceMergeNode deviceMergeNode =
+        new DeviceMergeNode(context.getQueryId().genPlanNodeId(), mergeOrder);
+    for (Map.Entry<String, List<PlanNode>> entry : deviceNameToSourceNodesMap.entrySet()) {
+      String deviceName = entry.getKey();
+      List<PlanNode> planNodes = new ArrayList<>(entry.getValue());
+      PlanNode timeJoinNode =
+          convergeWithTimeJoin(planNodes, mergeOrder, queryFilter, selectedPathList);
+      AggregateNode aggregateNode =
+          new AggregateNode(
+              context.getQueryId().genPlanNodeId(),
+              timeJoinNode,
+              deviceNameToAggregationsMap.get(deviceName),
+              null);
+      deviceMergeNode.addChildDeviceNode(deviceName, aggregateNode);
+    }
+    this.root = deviceMergeNode;
+  }
+
+  private PlanNode convergeWithTimeJoin(
+      List<PlanNode> sourceNodes,
+      OrderBy mergeOrder,
+      IExpression queryFilter,
+      List<String> outputColumnNames) {
+    PlanNode tmpNode;
+    if (sourceNodes.size() == 1) {
+      tmpNode = sourceNodes.get(0);
+    } else {
+      tmpNode = new TimeJoinNode(context.getQueryId().genPlanNodeId(), mergeOrder, sourceNodes);
+    }
+
+    if (queryFilter != null) {
+      tmpNode =
+          new FilterNode(
+              context.getQueryId().genPlanNodeId(), tmpNode, queryFilter, outputColumnNames);
+    }
+
+    return tmpNode;
+  }
+
+  public void planGroupByLevel(GroupByLevelComponent groupByLevelComponent) {
+    if (groupByLevelComponent == null) {
+      return;
+    }
+
+    this.root =
+        new GroupByLevelNode(
+            context.getQueryId().genPlanNodeId(),
+            this.getRoot(),
+            groupByLevelComponent.getLevels(),
+            groupByLevelComponent.getGroupedHeaderMap());
+  }
+
+  public void planFilterNull(FilterNullComponent filterNullComponent) {
+    if (filterNullComponent == null) {
+      return;
+    }
+
+    this.root =
+        new FilterNullNode(
+            context.getQueryId().genPlanNodeId(),
+            this.getRoot(),
+            filterNullComponent.getWithoutPolicyType(),
+            filterNullComponent.getWithoutNullColumns().stream()
+                .map(Expression::getExpressionString)
+                .collect(Collectors.toList()));
+  }
+
+  public void planLimit(int rowLimit) {
+    if (rowLimit == 0) {
+      return;
+    }
+
+    this.root = new LimitNode(context.getQueryId().genPlanNodeId(), this.getRoot(), rowLimit);
+  }
+
+  public void planOffset(int rowOffset) {
+    if (rowOffset == 0) {
+      return;
+    }
+
+    this.root = new OffsetNode(context.getQueryId().genPlanNodeId(), this.getRoot(), rowOffset);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDTFQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IOutputPlanNode.java
similarity index 68%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDTFQueryStatement.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IOutputPlanNode.java
index 79d3c75efa..f88ad5d70d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDTFQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IOutputPlanNode.java
@@ -17,11 +17,18 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.sql.statement.crud;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
 
-public class UDTFQueryStatement extends QueryStatement {
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
-  public UDTFQueryStatement(QueryStatement queryStatement) {
-    super(queryStatement);
-  }
+import java.util.List;
+
+public interface IOutputPlanNode {
+
+  List<ColumnHeader> getOutputColumnHeaders();
+
+  List<String> getOutputColumnNames();
+
+  List<TSDataType> getOutputColumnTypes();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/ColumnHeader.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/ColumnHeader.java
new file mode 100644
index 0000000000..afb27badba
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/ColumnHeader.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.Objects;
+
+public class ColumnHeader {
+
+  private final String pathName;
+  private String functionName;
+  private final TSDataType dataType;
+
+  public ColumnHeader(String pathName, TSDataType dataType) {
+    this.pathName = pathName;
+    this.dataType = dataType;
+  }
+
+  public ColumnHeader(String pathName, String functionName, TSDataType dataType) {
+    this.pathName = pathName;
+    this.functionName = functionName.toLowerCase();
+    this.dataType = dataType;
+  }
+
+  public String getColumnName() {
+    if (functionName != null) {
+      return String.format("%s(%s)", functionName, pathName);
+    }
+    return pathName;
+  }
+
+  public TSDataType getColumnType() {
+    return dataType;
+  }
+
+  public ColumnHeader replacePathWithMeasurement() {
+    String measurement = null;
+    try {
+      measurement = new PartialPath(pathName).getMeasurement();
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+    }
+    if (functionName != null) {
+      return new ColumnHeader(measurement, functionName, dataType);
+    }
+    return new ColumnHeader(measurement, dataType);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    ColumnHeader that = (ColumnHeader) o;
+    return Objects.equals(pathName, that.pathName)
+        && Objects.equals(functionName, that.functionName)
+        && dataType == that.dataType;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(pathName, functionName, dataType);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index 3fe488161c..9783d2ab2a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -74,8 +74,6 @@ public abstract class PlanNode {
 
   public abstract int allowedChildCount();
 
-  public abstract List<String> getOutputColumnNames();
-
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitPlan(this, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java
index c00130bf14..5c7c44261c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/ShowDevicesNode.java
@@ -49,11 +49,6 @@ public class ShowDevicesNode extends ShowNode {
     return NO_CHILD_ALLOWED;
   }
 
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
   public static ShowDevicesNode deserialize(ByteBuffer byteBuffer) {
     return null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
index 492a1b892b..374279efad 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
@@ -135,11 +135,6 @@ public class AlterTimeSeriesNode extends PlanNode {
     return NO_CHILD_ALLOWED;
   }
 
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
   @Override
   public void serialize(ByteBuffer byteBuffer) {
     byteBuffer.putShort((short) PlanNodeType.ALTER_TIME_SERIES.ordinal());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java
index 6ab7447804..7bb72a37fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java
@@ -140,11 +140,6 @@ public class AuthorNode extends PlanNode {
     return 0;
   }
 
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
   @Override
   public void serialize(ByteBuffer buffer) {
     buffer.putInt(getPlanType(authorType));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index 722c6575e7..98561ab0e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -158,11 +158,6 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
     return NO_CHILD_ALLOWED;
   }
 
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
   @Override
   public void serialize(ByteBuffer byteBuffer) {
     byteBuffer.putShort((short) PlanNodeType.CREATE_ALIGNED_TIME_SERIES.ordinal());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 5ba57ea192..db7f4f8c4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -156,11 +156,6 @@ public class CreateTimeSeriesNode extends PlanNode {
     return NO_CHILD_ALLOWED;
   }
 
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
   public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
     return null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
index 6105fbc2cb..b7d2502b35 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
@@ -18,55 +18,77 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import com.google.common.collect.ImmutableList;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * This node is used to aggregate required series from multiple sources. The source data will be
  * input as a TsBlock, it may be raw data or partial aggregation result. This node will output the
  * final series aggregated result represented by TsBlock.
  */
-public class AggregateNode extends ProcessNode {
-  // The parameter of `group by time`
-  // Its value will be null if there is no `group by time` clause,
-  private GroupByTimeParameter groupByTimeParameter;
+public class AggregateNode extends ProcessNode implements IOutputPlanNode {
+
+  // The map from columns to corresponding aggregation functions on that column.
+  //    KEY: The index of a column in the input {@link TsBlock}.
+  //    VALUE: Aggregation functions on this column.
+  // (Currently, we only support one series in the aggregation function.)
+  private final Map<PartialPath, Set<AggregationType>> aggregateFuncMap;
+
+  // The parameter of `group by time`.
+  // Its value will be null if there is no `group by time` clause.
+  private final GroupByTimeParameter groupByTimeParameter;
 
-  // The list of aggregation functions, each FunctionExpression will be output as one column of
-  // result TsBlock
-  // (Currently we only support one series in the aggregation function)
-  // TODO: need consider whether it is suitable the aggregation function using FunctionExpression
-  private Map<String, FunctionExpression> aggregateFuncMap;
+  private final List<ColumnHeader> columnHeaders = new ArrayList<>();
 
-  private final List<PlanNode> children;
-  private final List<String> columnNames;
+  private PlanNode child;
 
   public AggregateNode(
       PlanNodeId id,
-      Map<String, FunctionExpression> aggregateFuncMap,
-      List<PlanNode> children,
-      List<String> columnNames) {
+      PlanNode child,
+      Map<PartialPath, Set<AggregationType>> aggregateFuncMap,
+      GroupByTimeParameter groupByTimeParameter) {
     super(id);
+    this.child = child;
     this.aggregateFuncMap = aggregateFuncMap;
-    this.children = children;
-    this.columnNames = columnNames;
+    this.groupByTimeParameter = groupByTimeParameter;
+    for (Map.Entry<PartialPath, Set<AggregationType>> entry : aggregateFuncMap.entrySet()) {
+      PartialPath path = entry.getKey();
+      columnHeaders.addAll(
+          entry.getValue().stream()
+              .map(
+                  functionName ->
+                      new ColumnHeader(
+                          path.getFullPath(), functionName.name(), path.getSeriesType()))
+              .collect(Collectors.toList()));
+    }
   }
 
   @Override
   public List<PlanNode> getChildren() {
-    return children;
+    return ImmutableList.of(child);
   }
 
   @Override
   public void addChild(PlanNode child) {
-    throw new NotImplementedException("addChild of AggregateNode is not implemented");
+    this.child = child;
   }
 
   @Override
@@ -79,9 +101,19 @@ public class AggregateNode extends ProcessNode {
     return CHILD_COUNT_NO_LIMIT;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return columnHeaders;
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
-    return columnNames;
+    return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
   }
 
   @Override
@@ -95,4 +127,25 @@ public class AggregateNode extends ProcessNode {
 
   @Override
   public void serialize(ByteBuffer byteBuffer) {}
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    AggregateNode that = (AggregateNode) o;
+    return Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
+        && Objects.equals(aggregateFuncMap, that.aggregateFuncMap)
+        && Objects.equals(child, that.child);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(groupByTimeParameter, aggregateFuncMap, child);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index c30f48e3ae..9b21f00198 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -19,17 +19,19 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.stream.Collectors;
 
 /**
  * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And
@@ -40,7 +42,8 @@ import java.util.Map;
  * same between these TsBlocks. If the input TsBlock contains n columns, the device-based view will
  * contain n+1 columns where the new column is Device column.
  */
-public class DeviceMergeNode extends ProcessNode {
+public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
+
   // The result output order that this operator
   private OrderBy mergeOrder;
 
@@ -48,15 +51,15 @@ public class DeviceMergeNode extends ProcessNode {
   // The without policy is able to be push down to the DeviceMergeNode because we can know whether a
   // row contains
   // null or not.
-  private FilterNullPolicy filterNullPolicy;
+  private FilterNullComponent filterNullComponent;
 
   // The map from deviceName to corresponding query result node responsible for that device.
   // DeviceNode means the node whose output TsBlock contains the data belonged to one device.
-  private Map<String, PlanNode> childDeviceNodeMap;
+  private Map<String, PlanNode> childDeviceNodeMap = new HashMap<>();
 
   private List<PlanNode> children;
 
-  private List<String> columnNames;
+  private final List<ColumnHeader> columnHeaders = new ArrayList<>();;
 
   public DeviceMergeNode(PlanNodeId id) {
     super(id);
@@ -88,9 +91,35 @@ public class DeviceMergeNode extends ProcessNode {
     return CHILD_COUNT_NO_LIMIT;
   }
 
+  public void addChildDeviceNode(String deviceName, PlanNode childNode) {
+    this.childDeviceNodeMap.put(deviceName, childNode);
+    this.children.add(childNode);
+    updateColumnHeaders(childNode);
+  }
+
+  private void updateColumnHeaders(PlanNode childNode) {
+    List<ColumnHeader> childColumnHeaders = ((IOutputPlanNode) childNode).getOutputColumnHeaders();
+    for (ColumnHeader columnHeader : childColumnHeaders) {
+      ColumnHeader tmpColumnHeader = columnHeader.replacePathWithMeasurement();
+      if (!columnHeaders.contains(tmpColumnHeader)) {
+        columnHeaders.add(tmpColumnHeader);
+      }
+    }
+  }
+
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return columnHeaders;
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
-    return columnNames;
+    return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
   }
 
   public OrderBy getMergeOrder() {
@@ -109,17 +138,6 @@ public class DeviceMergeNode extends ProcessNode {
   @Override
   public void serialize(ByteBuffer byteBuffer) {}
 
-  public DeviceMergeNode(PlanNodeId id, Map<String, PlanNode> deviceNodeMap) {
-    this(id);
-    this.childDeviceNodeMap = deviceNodeMap;
-    this.children.addAll(deviceNodeMap.values());
-  }
-
-  public void addChildDeviceNode(String deviceName, PlanNode childNode) {
-    this.childDeviceNodeMap.put(deviceName, childNode);
-    this.children.add(childNode);
-  }
-
   @TestOnly
   public Pair<String, List<String>> print() {
     String title = String.format("[DeviceMergeNode (%s)]", this.getPlanNodeId());
@@ -131,4 +149,25 @@ public class DeviceMergeNode extends ProcessNode {
   public void setChildren(List<PlanNode> children) {
     this.children = children;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    DeviceMergeNode that = (DeviceMergeNode) o;
+    return mergeOrder == that.mergeOrder
+        && filterNullComponent == that.filterNullComponent
+        && Objects.equals(childDeviceNodeMap, that.childDeviceNodeMap);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(mergeOrder, filterNullComponent, childDeviceNodeMap);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 21a3374644..42a8f5880a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -82,11 +82,6 @@ public class ExchangeNode extends PlanNode {
     this.upstreamPlanNodeId = nodeId;
   }
 
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
   public static ExchangeNode deserialize(ByteBuffer byteBuffer) {
     return null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
index 984fb047e2..147f817f4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
@@ -19,10 +19,13 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.collect.ImmutableList;
@@ -32,7 +35,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 /** FillNode is used to fill the empty field in one row. */
-public class FillNode extends ProcessNode {
+public class FillNode extends ProcessNode implements IOutputPlanNode {
 
   private PlanNode child;
 
@@ -68,9 +71,19 @@ public class FillNode extends ProcessNode {
     return ONE_CHILD;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return ((IOutputPlanNode) child).getOutputColumnHeaders();
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
-    return child.getOutputColumnNames();
+    return ((IOutputPlanNode) child).getOutputColumnNames();
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return ((IOutputPlanNode) child).getOutputColumnTypes();
   }
 
   public FillPolicy getFillPolicy() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
index 3347eead1b..46aff629dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
@@ -19,10 +19,13 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.collect.ImmutableList;
@@ -30,22 +33,32 @@ import com.google.common.collect.ImmutableList;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 /** The FilterNode is responsible to filter the RowRecord from TsBlock. */
-public class FilterNode extends ProcessNode {
+public class FilterNode extends ProcessNode implements IOutputPlanNode {
 
   private PlanNode child;
 
-  private final QueryFilter predicate;
+  private final IExpression predicate;
 
-  public FilterNode(PlanNodeId id, QueryFilter predicate) {
+  private List<ColumnHeader> columnHeaders;
+
+  public FilterNode(PlanNodeId id, IExpression predicate) {
     super(id);
     this.predicate = predicate;
   }
 
-  public FilterNode(PlanNodeId id, PlanNode child, QueryFilter predicate) {
+  public FilterNode(
+      PlanNodeId id, PlanNode child, IExpression predicate, List<String> outputColumnNames) {
     this(id, predicate);
     this.child = child;
+    this.columnHeaders =
+        ((IOutputPlanNode) child)
+            .getOutputColumnHeaders().stream()
+                .filter(columnHeader -> outputColumnNames.contains(columnHeader.getColumnName()))
+                .collect(Collectors.toList());
   }
 
   @Override
@@ -68,9 +81,19 @@ public class FilterNode extends ProcessNode {
     return ONE_CHILD;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return columnHeaders;
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
-    return child.getOutputColumnNames();
+    return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
   }
 
   @Override
@@ -85,7 +108,7 @@ public class FilterNode extends ProcessNode {
   @Override
   public void serialize(ByteBuffer byteBuffer) {}
 
-  public QueryFilter getPredicate() {
+  public IExpression getPredicate() {
     return predicate;
   }
 
@@ -98,6 +121,28 @@ public class FilterNode extends ProcessNode {
     String title = String.format("[FilterNode (%s)]", this.getPlanNodeId());
     List<String> attributes = new ArrayList<>();
     attributes.add("QueryFilter: " + this.getPredicate());
+    attributes.add("outputColumnNames: " + this.getOutputColumnNames());
     return new Pair<>(title, attributes);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    FilterNode that = (FilterNode) o;
+    return Objects.equals(child, that.child)
+        && Objects.equals(predicate, that.predicate)
+        && Objects.equals(columnHeaders, that.columnHeaders);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(child, predicate, columnHeaders);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
index ef1a705b24..a2fb971b0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
@@ -19,10 +19,13 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.collect.ImmutableList;
@@ -30,25 +33,23 @@ import com.google.common.collect.ImmutableList;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 /** WithoutNode is used to discard specific rows from upstream node. */
-public class FilterNullNode extends ProcessNode {
+public class FilterNullNode extends ProcessNode implements IOutputPlanNode {
 
   // The policy to discard the result from upstream operator
-  private FilterNullPolicy discardPolicy;
+  private final FilterNullPolicy discardPolicy;
 
-  private PlanNode child;
-
-  private List<String> filterNullColumnNames;
+  private final List<String> filterNullColumnNames;
 
-  public FilterNullNode(PlanNodeId id, PlanNode child) {
-    super(id);
-    this.child = child;
-  }
+  private PlanNode child;
 
-  public FilterNullNode(PlanNodeId id, FilterNullPolicy policy) {
+  public FilterNullNode(
+      PlanNodeId id, FilterNullPolicy policy, List<String> filterNullColumnNames) {
     super(id);
     this.discardPolicy = policy;
+    this.filterNullColumnNames = filterNullColumnNames;
   }
 
   public FilterNullNode(
@@ -56,9 +57,8 @@ public class FilterNullNode extends ProcessNode {
       PlanNode child,
       FilterNullPolicy discardPolicy,
       List<String> filterNullColumnNames) {
-    this(id, discardPolicy);
+    this(id, discardPolicy, filterNullColumnNames);
     this.child = child;
-    this.filterNullColumnNames = filterNullColumnNames;
   }
 
   @Override
@@ -73,7 +73,7 @@ public class FilterNullNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new FilterNullNode(getPlanNodeId(), discardPolicy);
+    return new FilterNullNode(getPlanNodeId(), discardPolicy, filterNullColumnNames);
   }
 
   @Override
@@ -81,9 +81,19 @@ public class FilterNullNode extends ProcessNode {
     return ONE_CHILD;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return ((IOutputPlanNode) child).getOutputColumnHeaders();
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
-    return child.getOutputColumnNames();
+    return ((IOutputPlanNode) child).getOutputColumnNames();
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return ((IOutputPlanNode) child).getOutputColumnTypes();
   }
 
   public FilterNullPolicy getDiscardPolicy() {
@@ -106,10 +116,6 @@ public class FilterNullNode extends ProcessNode {
   @Override
   public void serialize(ByteBuffer byteBuffer) {}
 
-  public void setFilterNullColumnNames(List<String> filterNullColumnNames) {
-    this.filterNullColumnNames = filterNullColumnNames;
-  }
-
   @TestOnly
   public Pair<String, List<String>> print() {
     String title = String.format("[FilterNullNode (%s)]", this.getPlanNodeId());
@@ -118,4 +124,25 @@ public class FilterNullNode extends ProcessNode {
     attributes.add("FilterNullColumnNames: " + this.getFilterNullColumnNames());
     return new Pair<>(title, attributes);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    FilterNullNode that = (FilterNullNode) o;
+    return discardPolicy == that.discardPolicy
+        && Objects.equals(child, that.child)
+        && Objects.equals(filterNullColumnNames, that.filterNullColumnNames);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(discardPolicy, child, filterNullColumnNames);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
index a194ea05db..a8f41cfb71 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
@@ -19,10 +19,13 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.nio.ByteBuffer;
@@ -30,34 +33,43 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * This node is responsible for the final aggregation merge operation. It will process the data from
  * TsBlock row by row. For one row, it will rollup the fields which have the same aggregate function
  * and belong to one bucket. Here, that two columns belong to one bucket means the partial paths of
- * device after rolling up in specific level are the same. For example, let's say there are two
- * columns `root.sg.d1.s1` and `root.sg.d2.s1`. If the group by level parameter is [0, 1], then
- * these two columns will belong to one bucket and the bucket name is `root.sg.*.s1`. If the group
- * by level parameter is [0, 2], then these two columns will not belong to one bucket. And the total
- * buckets are `root.*.d1.s1` and `root.*.d2.s1`
+ * device after rolling up in specific level are the same.
+ *
+ * <p>For example, let's say there are two columns `root.sg.d1.s1` and `root.sg.d2.s1`.
+ *
+ * <p>If the group by level parameter is [0, 1], then these two columns will belong to one bucket
+ * and the bucket name is `root.sg.*.s1`.
+ *
+ * <p>If the group by level parameter is [0, 2], then these two columns will not belong to one
+ * bucket. And the total buckets are `root.*.d1.s1` and `root.*.d2.s1`
  */
-public class GroupByLevelNode extends ProcessNode {
+public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
 
-  private PlanNode child;
+  private final int[] groupByLevels;
 
-  private int[] groupByLevels;
+  private final Map<ColumnHeader, ColumnHeader> groupedPathMap;
 
-  private List<String> columnNames;
+  private final PlanNode child;
 
-  private Map<String, String> groupedPathMap;
+  private final List<ColumnHeader> columnHeaders;
 
   public GroupByLevelNode(
-      PlanNodeId id, PlanNode child, int[] groupByLevels, Map<String, String> groupedPathMap) {
+      PlanNodeId id,
+      PlanNode child,
+      int[] groupByLevels,
+      Map<ColumnHeader, ColumnHeader> groupedPathMap) {
     super(id);
     this.child = child;
     this.groupByLevels = groupByLevels;
     this.groupedPathMap = groupedPathMap;
-    this.columnNames = new ArrayList<>(groupedPathMap.values());
+    this.columnHeaders = groupedPathMap.values().stream().distinct().collect(Collectors.toList());
   }
 
   @Override
@@ -80,9 +92,23 @@ public class GroupByLevelNode extends ProcessNode {
     return CHILD_COUNT_NO_LIMIT;
   }
 
+  public int[] getGroupByLevels() {
+    return groupByLevels;
+  }
+
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return columnHeaders;
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
-    return columnNames;
+    return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
   }
 
   @Override
@@ -97,22 +123,6 @@ public class GroupByLevelNode extends ProcessNode {
   @Override
   public void serialize(ByteBuffer byteBuffer) {}
 
-  public int[] getGroupByLevels() {
-    return groupByLevels;
-  }
-
-  public void setGroupByLevels(int[] groupByLevels) {
-    this.groupByLevels = groupByLevels;
-  }
-
-  public List<String> getColumnNames() {
-    return columnNames;
-  }
-
-  public void setColumnNames(List<String> columnNames) {
-    this.columnNames = columnNames;
-  }
-
   @TestOnly
   public Pair<String, List<String>> print() {
     String title = String.format("[GroupByLevelNode (%s)]", this.getPlanNodeId());
@@ -121,4 +131,27 @@ public class GroupByLevelNode extends ProcessNode {
     attributes.add("ColumnNames: " + this.getOutputColumnNames());
     return new Pair<>(title, attributes);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    GroupByLevelNode that = (GroupByLevelNode) o;
+    return Objects.equals(child, that.child)
+        && Arrays.equals(groupByLevels, that.groupByLevels)
+        && Objects.equals(groupedPathMap, that.groupedPathMap);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = Objects.hash(child, groupedPathMap);
+    result = 31 * result + Arrays.hashCode(groupByLevels);
+    return result;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index b0c6a4e93f..0a6f971126 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -19,9 +19,12 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.collect.ImmutableList;
@@ -29,12 +32,13 @@ import com.google.common.collect.ImmutableList;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 /** LimitNode is used to select top n result. It uses the default order of upstream nodes */
-public class LimitNode extends ProcessNode {
+public class LimitNode extends ProcessNode implements IOutputPlanNode {
 
   // The limit count
-  private int limit;
+  private final int limit;
   private PlanNode child;
 
   public LimitNode(PlanNodeId id, int limit) {
@@ -42,7 +46,7 @@ public class LimitNode extends ProcessNode {
     this.limit = limit;
   }
 
-  public LimitNode(PlanNodeId id, int limit, PlanNode child) {
+  public LimitNode(PlanNodeId id, PlanNode child, int limit) {
     this(id, limit);
     this.child = child;
   }
@@ -67,9 +71,19 @@ public class LimitNode extends ProcessNode {
     return ONE_CHILD;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return ((IOutputPlanNode) child).getOutputColumnHeaders();
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
-    return child.getOutputColumnNames();
+    return ((IOutputPlanNode) child).getOutputColumnNames();
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return ((IOutputPlanNode) child).getOutputColumnTypes();
   }
 
   @Override
@@ -107,4 +121,23 @@ public class LimitNode extends ProcessNode {
     attributes.add("RowLimit: " + this.getLimit());
     return new Pair<>(title, attributes);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    LimitNode that = (LimitNode) o;
+    return limit == that.limit && Objects.equals(child, that.child);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(limit, child);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
index f614a2cd3a..da0e0019f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
@@ -19,20 +19,24 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of
  * upstream nodes
  */
-public class OffsetNode extends ProcessNode {
+public class OffsetNode extends ProcessNode implements IOutputPlanNode {
 
   // The limit count
   private PlanNode child;
@@ -68,9 +72,19 @@ public class OffsetNode extends ProcessNode {
     return ONE_CHILD;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return ((IOutputPlanNode) child).getOutputColumnHeaders();
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
-    return null;
+    return ((IOutputPlanNode) child).getOutputColumnNames();
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return ((IOutputPlanNode) child).getOutputColumnTypes();
   }
 
   @Override
@@ -100,4 +114,23 @@ public class OffsetNode extends ProcessNode {
     attributes.add("RowOffset: " + this.getOffset());
     return new Pair<>(title, attributes);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    OffsetNode that = (OffsetNode) o;
+    return offset == that.offset && Objects.equals(child, that.child);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(child, offset);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
index 4744d1f938..33dfbf70d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
@@ -19,10 +19,13 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.collect.ImmutableList;
@@ -35,7 +38,7 @@ import java.util.List;
  * In general, the parameter in sortNode should be pushed down to the upstream operators. In our
  * optimized logical query plan, the sortNode should not appear.
  */
-public class SortNode extends ProcessNode {
+public class SortNode extends ProcessNode implements IOutputPlanNode {
 
   private PlanNode child;
 
@@ -74,9 +77,19 @@ public class SortNode extends ProcessNode {
     return ONE_CHILD;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return ((IOutputPlanNode) child).getOutputColumnHeaders();
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
-    return child.getOutputColumnNames();
+    return ((IOutputPlanNode) child).getOutputColumnNames();
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return ((IOutputPlanNode) child).getOutputColumnTypes();
   }
 
   public OrderBy getSortOrder() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index be36edb3e3..9afafe41a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -19,6 +19,8 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
@@ -30,18 +32,19 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 /**
- * TimeJoinOperator is responsible for join two or more TsBlock. The join algorithm is like outer
- * join by timestamp column. It will join two or more TsBlock by Timestamp column. The output result
- * of TimeJoinOperator is sorted by timestamp
+ * This node is responsible for join two or more TsBlock. The join algorithm is like outer join by
+ * timestamp column. It will join two or more TsBlock by Timestamp column. The output result of
+ * TimeJoinOperator is sorted by timestamp
  */
 // TODO: define the TimeJoinMergeNode for distributed plan
-public class TimeJoinNode extends ProcessNode {
+public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
 
   // This parameter indicates the order when executing multiway merge sort.
-  private OrderBy mergeOrder;
+  private final OrderBy mergeOrder;
 
   // The policy to decide whether a row should be discarded
   // The without policy is able to be push down to the TimeJoinOperator because we can know whether
@@ -51,23 +54,18 @@ public class TimeJoinNode extends ProcessNode {
 
   private List<PlanNode> children;
 
-  // output columns' data type
-  private List<TSDataType> types;
+  private final List<ColumnHeader> columnHeaders = new ArrayList<>();
 
-  public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder, FilterNullPolicy filterNullPolicy) {
+  public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder) {
     super(id);
     this.mergeOrder = mergeOrder;
-    this.filterNullPolicy = filterNullPolicy;
     this.children = new ArrayList<>();
   }
 
-  public TimeJoinNode(
-      PlanNodeId id,
-      OrderBy mergeOrder,
-      FilterNullPolicy filterNullPolicy,
-      List<PlanNode> children) {
-    this(id, mergeOrder, filterNullPolicy);
+  public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder, List<PlanNode> children) {
+    this(id, mergeOrder);
     this.children = children;
+    initColumnHeaders();
   }
 
   @Override
@@ -77,7 +75,7 @@ public class TimeJoinNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new TimeJoinNode(getPlanNodeId(), this.mergeOrder, this.filterNullPolicy);
+    return new TimeJoinNode(getPlanNodeId(), this.mergeOrder);
   }
 
   @Override
@@ -85,11 +83,24 @@ public class TimeJoinNode extends ProcessNode {
     return CHILD_COUNT_NO_LIMIT;
   }
 
+  private void initColumnHeaders() {
+    for (PlanNode child : children) {
+      columnHeaders.addAll(((IOutputPlanNode) child).getOutputColumnHeaders());
+    }
+  }
+
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return columnHeaders;
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
-    return children.stream()
-        .flatMap(child -> child.getOutputColumnNames().stream())
-        .collect(Collectors.toList());
+    return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+  }
+
+  public List<TSDataType> getOutputColumnTypes() {
+    return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
   }
 
   @Override
@@ -121,10 +132,6 @@ public class TimeJoinNode extends ProcessNode {
     return filterNullPolicy;
   }
 
-  public void setMergeOrder(OrderBy mergeOrder) {
-    this.mergeOrder = mergeOrder;
-  }
-
   public void setWithoutPolicy(FilterNullPolicy filterNullPolicy) {
     this.filterNullPolicy = filterNullPolicy;
   }
@@ -133,10 +140,6 @@ public class TimeJoinNode extends ProcessNode {
     return "TimeJoinNode-" + this.getPlanNodeId();
   }
 
-  public List<TSDataType> getTypes() {
-    return types;
-  }
-
   @TestOnly
   public Pair<String, List<String>> print() {
     String title = String.format("[TimeJoinNode (%s)]", this.getPlanNodeId());
@@ -147,4 +150,25 @@ public class TimeJoinNode extends ProcessNode {
             + (this.getFilterNullPolicy() == null ? "null" : this.getFilterNullPolicy()));
     return new Pair<>(title, attributes);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    TimeJoinNode that = (TimeJoinNode) o;
+    return mergeOrder == that.mergeOrder
+        && filterNullPolicy == that.filterNullPolicy
+        && Objects.equals(children, that.children);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(mergeOrder, filterNullPolicy, children);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index d8562d144f..59dfc486ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -73,11 +73,6 @@ public class FragmentSinkNode extends SinkNode {
     return ONE_CHILD;
   }
 
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
   public static FragmentSinkNode deserialize(ByteBuffer byteBuffer) {
     return null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 2f5d88b91b..5350b10d17 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -20,12 +20,17 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -34,15 +39,17 @@ import com.google.common.collect.ImmutableList;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
- * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. It will
- * read the target series and calculate the aggregation result by the aggregation digest or raw data
- * of this series.
+ * This node is responsible to do the aggregation calculation for one series. It will read the
+ * target series and calculate the aggregation result by the aggregation digest or raw data of this
+ * series.
  *
  * <p>The aggregation result will be represented as a TsBlock
  *
- * <p>This operator will split data of the target series into many groups by time range and do the
+ * <p>This node will split data of the target series into many groups by time range and do the
  * aggregation calculation for each group. Each result will be one row of the result TsBlock. The
  * timestamp of each row is the start time of the time range group.
  *
@@ -50,26 +57,49 @@ import java.util.List;
  * represent the whole aggregation result of this series. And the timestamp will be 0, which is
  * meaningless.
  */
-public class SeriesAggregateScanNode extends SourceNode {
+public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNode {
 
-  // The parameter of `group by time`
-  // Its value will be null if there is no `group by time` clause,
-  private GroupByTimeParameter groupByTimeParameter;
+  // The series path and aggregation functions on this series.
+  // (Currently, we only support one series in the aggregation function)
+  private final PartialPath seriesPath;
+  private final List<AggregationType> aggregateFuncList;
 
-  // The aggregation function, which contains the function name and related series.
-  // (Currently we only support one series in the aggregation function)
-  // TODO: need consider whether it is suitable the aggregation function using FunctionExpression
-  private FunctionExpression aggregateFunc;
+  // The order to traverse the data.
+  // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
+  // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
+  private final OrderBy scanOrder;
 
-  private Filter filter;
+  private final Filter timeFilter;
 
-  private String columnName;
+  // The parameter of `group by time`
+  // Its value will be null if there is no `group by time` clause,
+  private final GroupByTimeParameter groupByTimeParameter;
+
+  private List<ColumnHeader> columnHeaders;
 
   // The id of DataRegion where the node will run
   private RegionReplicaSet regionReplicaSet;
 
-  public SeriesAggregateScanNode(PlanNodeId id) {
+  public SeriesAggregateScanNode(
+      PlanNodeId id,
+      PartialPath seriesPath,
+      List<AggregationType> aggregateFuncList,
+      OrderBy scanOrder,
+      Filter timeFilter,
+      GroupByTimeParameter groupByTimeParameter) {
     super(id);
+    this.seriesPath = seriesPath;
+    this.aggregateFuncList = aggregateFuncList;
+    this.scanOrder = scanOrder;
+    this.timeFilter = timeFilter;
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.columnHeaders =
+        aggregateFuncList.stream()
+            .map(
+                functionType ->
+                    new ColumnHeader(
+                        seriesPath.getFullPath(), functionType.name(), seriesPath.getSeriesType()))
+            .collect(Collectors.toList());
   }
 
   @Override
@@ -91,19 +121,18 @@ public class SeriesAggregateScanNode extends SourceNode {
   }
 
   @Override
-  public List<String> getOutputColumnNames() {
-    return ImmutableList.of(columnName);
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return columnHeaders;
   }
 
-  public SeriesAggregateScanNode(PlanNodeId id, FunctionExpression aggregateFunc) {
-    this(id);
-    this.aggregateFunc = aggregateFunc;
+  @Override
+  public List<String> getOutputColumnNames() {
+    return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
   }
 
-  public SeriesAggregateScanNode(
-      PlanNodeId id, FunctionExpression aggregateFunc, GroupByTimeParameter groupByTimeParameter) {
-    this(id, aggregateFunc);
-    this.groupByTimeParameter = groupByTimeParameter;
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
   }
 
   @Override
@@ -119,16 +148,6 @@ public class SeriesAggregateScanNode extends SourceNode {
     this.regionReplicaSet = regionReplicaSet;
   }
 
-  @Override
-  public String getDeviceName() {
-    return aggregateFunc.getPaths().get(0).getDevice();
-  }
-
-  @Override
-  protected String getExpressionString() {
-    return aggregateFunc.getExpressionString();
-  }
-
   @Override
   public void close() throws Exception {}
 
@@ -144,18 +163,50 @@ public class SeriesAggregateScanNode extends SourceNode {
   @Override
   public void serialize(ByteBuffer byteBuffer) {}
 
-  // This method is used when do the PredicatePushDown.
-  // The filter is not put in the constructor because the filter is only clear in the predicate
-  // push-down stage
-  public void setFilter(Filter filter) {
-    this.filter = filter;
+  public PartialPath getSeriesPath() {
+    return seriesPath;
+  }
+
+  public List<AggregationType> getAggregateFuncList() {
+    return aggregateFuncList;
   }
 
   @TestOnly
   public Pair<String, List<String>> print() {
     String title = String.format("[SeriesAggregateScanNode (%s)]", this.getPlanNodeId());
     List<String> attributes = new ArrayList<>();
-    attributes.add("AggregateFunction: " + this.getExpressionString());
+    attributes.add("AggregateFunctions: " + this.getAggregateFuncList().toString());
     return new Pair<>(title, attributes);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SeriesAggregateScanNode that = (SeriesAggregateScanNode) o;
+    return Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
+        && Objects.equals(seriesPath, that.seriesPath)
+        && Objects.equals(
+            aggregateFuncList.stream().sorted().collect(Collectors.toList()),
+            that.aggregateFuncList.stream().sorted().collect(Collectors.toList()))
+        && scanOrder == that.scanOrder
+        && Objects.equals(timeFilter, that.timeFilter);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        super.hashCode(),
+        groupByTimeParameter,
+        seriesPath,
+        aggregateFuncList.stream().sorted().collect(Collectors.toList()),
+        scanOrder,
+        timeFilter);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index 37ba66c7fc..8cd5b3e5bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -21,10 +21,13 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -33,6 +36,7 @@ import com.google.common.collect.ImmutableList;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -42,10 +46,10 @@ import java.util.Set;
  *
  * <p>Children type: no child is allowed for SeriesScanNode
  */
-public class SeriesScanNode extends SourceNode {
+public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
 
   // The path of the target series which will be scanned.
-  private PartialPath seriesPath;
+  private final PartialPath seriesPath;
 
   // all the sensors in seriesPath's device of current query
   private Set<String> allSensors;
@@ -67,7 +71,7 @@ public class SeriesScanNode extends SourceNode {
   // offset for result set. The default value is 0
   private int offset;
 
-  private String columnName;
+  private ColumnHeader columnHeader;
 
   // The id of DataRegion where the node will run
   private RegionReplicaSet regionReplicaSet;
@@ -77,6 +81,15 @@ public class SeriesScanNode extends SourceNode {
     this.seriesPath = seriesPath;
   }
 
+  public SeriesScanNode(
+      PlanNodeId id, PartialPath seriesPath, Set<String> allSensors, OrderBy scanOrder) {
+    super(id);
+    this.seriesPath = seriesPath;
+    this.allSensors = allSensors;
+    this.scanOrder = scanOrder;
+    this.columnHeader = new ColumnHeader(seriesPath.getFullPath(), seriesPath.getSeriesType());
+  }
+
   public SeriesScanNode(PlanNodeId id, PartialPath seriesPath, RegionReplicaSet regionReplicaSet) {
     this(id, seriesPath);
     this.regionReplicaSet = regionReplicaSet;
@@ -105,16 +118,6 @@ public class SeriesScanNode extends SourceNode {
     this.regionReplicaSet = dataRegion;
   }
 
-  @Override
-  public String getDeviceName() {
-    return seriesPath.getDevice();
-  }
-
-  @Override
-  protected String getExpressionString() {
-    return seriesPath.getFullPath();
-  }
-
   public int getLimit() {
     return limit;
   }
@@ -123,10 +126,6 @@ public class SeriesScanNode extends SourceNode {
     return offset;
   }
 
-  public void setScanOrder(OrderBy scanOrder) {
-    this.scanOrder = scanOrder;
-  }
-
   public void setLimit(int limit) {
     this.limit = limit;
   }
@@ -153,9 +152,19 @@ public class SeriesScanNode extends SourceNode {
     return NO_CHILD_ALLOWED;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return ImmutableList.of(columnHeader);
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
-    return ImmutableList.of(columnName);
+    return ImmutableList.of(columnHeader.getColumnName());
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return ImmutableList.of(columnHeader.getColumnType());
   }
 
   public Set<String> getAllSensors() {
@@ -204,4 +213,37 @@ public class SeriesScanNode extends SourceNode {
     attributes.add("scanOrder: " + this.getScanOrder());
     return new Pair<>(title, attributes);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SeriesScanNode that = (SeriesScanNode) o;
+    return limit == that.limit
+        && offset == that.offset
+        && Objects.equals(seriesPath, that.seriesPath)
+        && Objects.equals(allSensors, that.allSensors)
+        && scanOrder == that.scanOrder
+        && Objects.equals(timeFilter, that.timeFilter)
+        && Objects.equals(valueFilter, that.valueFilter);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        super.hashCode(),
+        seriesPath,
+        allSensors,
+        scanOrder,
+        timeFilter,
+        valueFilter,
+        limit,
+        offset);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
index 5fdf152202..8c3f2f48d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
@@ -33,26 +33,4 @@ public abstract class SourceNode extends PlanNode implements AutoCloseable {
   public abstract RegionReplicaSet getDataRegionReplicaSet();
 
   public abstract void setDataRegionReplicaSet(RegionReplicaSet regionReplicaSet);
-
-  public abstract String getDeviceName();
-
-  protected abstract String getExpressionString();
-
-  @Override
-  public final int hashCode() {
-    return getExpressionString().hashCode();
-  }
-
-  @Override
-  public final boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-
-    if (!(o instanceof SourceNode)) {
-      return false;
-    }
-
-    return getExpressionString().equals(o.toString());
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index f646cc9760..ab15bdd0da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -150,11 +150,6 @@ public class InsertMultiTabletsNode extends InsertNode {
     return NO_CHILD_ALLOWED;
   }
 
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
   public static InsertMultiTabletsNode deserialize(ByteBuffer byteBuffer) {
     return null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index a1bcb7e5fa..002523c407 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -84,11 +84,6 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
     return NO_CHILD_ALLOWED;
   }
 
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
   public static InsertRowNode deserialize(ByteBuffer byteBuffer) {
     return null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index 92ce7684c3..68a39651f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@ -101,11 +101,6 @@ public class InsertRowsNode extends InsertNode {
     return NO_CHILD_ALLOWED;
   }
 
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
   public static InsertRowsNode deserialize(ByteBuffer byteBuffer) {
     return null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index e9adb763dd..c09de08cd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -97,11 +97,6 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
     return NO_CHILD_ALLOWED;
   }
 
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
   public static InsertRowsOfOneDeviceNode deserialize(ByteBuffer byteBuffer) {
     return null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index 6e5d4d1e3a..05ecc06367 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -135,11 +135,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     return NO_CHILD_ALLOWED;
   }
 
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
   public static InsertTabletNode deserialize(ByteBuffer byteBuffer) {
     return null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/ColumnPaginationController.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/ColumnPaginationController.java
index f479f63747..74e2c07692 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/ColumnPaginationController.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/ColumnPaginationController.java
@@ -32,6 +32,9 @@ public class ColumnPaginationController {
   // series offset for result set. The default value is 0
   private final int seriesOffset;
 
+  // for ALIGN BY DEVICE / DISABLE ALIGN / GROUP BY LEVEL / LAST, controller does is disabled
+  private final boolean isDisabled;
+
   private int curLimit =
       IoTDBDescriptor.getInstance().getConfig().getMaxQueryDeduplicatedPathNum() + 1;
   private int curOffset;
@@ -39,17 +42,26 @@ public class ColumnPaginationController {
   // records the path number that the SchemaTree totally returned
   private int consumed = 0;
 
-  public ColumnPaginationController(int seriesLimit, int seriesOffset) {
+  public ColumnPaginationController(int seriesLimit, int seriesOffset, boolean isDisabled) {
     // for series limit, the default value is 0, which means no limit
     this.curLimit = seriesLimit == 0 ? this.curLimit : Math.min(seriesLimit, this.curLimit);
     this.seriesOffset = this.curOffset = seriesOffset;
+    this.isDisabled = isDisabled;
   }
 
   public int getCurLimit() {
+    if (isDisabled) {
+      return 0;
+    }
+
     return curLimit;
   }
 
   public int getCurOffset() {
+    if (isDisabled) {
+      return 0;
+    }
+
     return curOffset;
   }
 
@@ -60,11 +72,19 @@ public class ColumnPaginationController {
         > IoTDBDescriptor.getInstance().getConfig().getMaxQueryDeduplicatedPathNum()) {
       throw new PathNumOverLimitException();
     }
+    if (isDisabled) {
+      return false;
+    }
+
     return curLimit == 0;
   }
 
   public void checkIfSoffsetIsExceeded(List<ResultColumn> resultColumns)
       throws StatementAnalyzeException {
+    if (isDisabled) {
+      return;
+    }
+
     if (consumed == 0 ? seriesOffset != 0 : resultColumns.isEmpty()) {
       throw new StatementAnalyzeException(
           String.format(
@@ -74,28 +94,52 @@ public class ColumnPaginationController {
   }
 
   public void consume(int limit, int offset) {
+    if (isDisabled) {
+      return;
+    }
+
     consumed += offset;
     curOffset -= Math.min(curOffset, offset);
     curLimit -= limit;
   }
 
   public boolean hasCurOffset() {
+    if (isDisabled) {
+      return false;
+    }
+
     return curOffset != 0;
   }
 
   public boolean hasCurLimit() {
+    if (isDisabled) {
+      return true;
+    }
+
     return curLimit != 0;
   }
 
   public void decCurOffset() {
+    if (isDisabled) {
+      return;
+    }
+
     curOffset--;
   }
 
   public void decCurLimit() {
+    if (isDisabled) {
+      return;
+    }
+
     curLimit--;
   }
 
   public void incConsumed(int num) {
+    if (isDisabled) {
+      return;
+    }
+
     consumed += num;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/WildcardsRemover.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/WildcardsRemover.java
index 6753a3ede2..f6c7fb327a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/WildcardsRemover.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/rewriter/WildcardsRemover.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.mpp.sql.statement.component.GroupByLevelController;
 import org.apache.iotdb.db.mpp.sql.statement.component.ResultColumn;
 import org.apache.iotdb.db.mpp.sql.statement.component.WhereCondition;
 import org.apache.iotdb.db.mpp.sql.statement.crud.AggregationQueryStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.LastQueryStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -60,20 +61,23 @@ public class WildcardsRemover {
    */
   private boolean isPrefixMatch;
 
-  public Statement rewrite(
-      Statement statement, SchemaTree schemaTree, Map<String, Set<PartialPath>> deviceIdToPathsMap)
+  public Statement rewrite(Statement statement, SchemaTree schemaTree)
       throws StatementAnalyzeException, PathNumOverLimitException {
     QueryStatement queryStatement = (QueryStatement) statement;
     this.paginationController =
         new ColumnPaginationController(
-            queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset());
+            queryStatement.getSeriesLimit(),
+            queryStatement.getSeriesOffset(),
+            queryStatement.isAlignByDevice()
+                || queryStatement.disableAlign()
+                || queryStatement instanceof LastQueryStatement
+                || queryStatement.isGroupByLevel());
     this.schemaTree = schemaTree;
     this.isPrefixMatch = queryStatement.isPrefixMatchPath();
 
     if (queryStatement.getIndexType() == null) {
       // remove wildcards in SELECT clause
       removeWildcardsInSelectPaths(queryStatement);
-      deviceIdToPathsMap.putAll(queryStatement.getSelectComponent().getDeviceIdToPathsMap());
 
       // remove wildcards in WITHOUT NULL clause
       if (queryStatement.getFilterNullComponent() != null
@@ -84,7 +88,7 @@ public class WildcardsRemover {
 
     // remove wildcards in WHERE clause
     if (queryStatement.getWhereCondition() != null) {
-      removeWildcardsInQueryFilter(queryStatement, deviceIdToPathsMap);
+      removeWildcardsInQueryFilter(queryStatement);
     }
 
     return queryStatement;
@@ -170,8 +174,7 @@ public class WildcardsRemover {
     queryStatement.getFilterNullComponent().setWithoutNullColumns(resultExpressions);
   }
 
-  private void removeWildcardsInQueryFilter(
-      QueryStatement queryStatement, Map<String, Set<PartialPath>> deviceIdToPathsMap)
+  private void removeWildcardsInQueryFilter(QueryStatement queryStatement)
       throws StatementAnalyzeException {
     WhereCondition whereCondition = queryStatement.getWhereCondition();
     List<PartialPath> fromPaths = queryStatement.getFromComponent().getPrefixPaths();
@@ -180,10 +183,6 @@ public class WildcardsRemover {
     whereCondition.setQueryFilter(
         removeWildcardsInQueryFilter(whereCondition.getQueryFilter(), fromPaths, resultPaths));
     whereCondition.getQueryFilter().setPathSet(resultPaths);
-
-    for (PartialPath path : resultPaths) {
-      deviceIdToPathsMap.computeIfAbsent(path.getDevice(), k -> new HashSet<>()).add(path);
-    }
   }
 
   private QueryFilter removeWildcardsInQueryFilter(
@@ -279,6 +278,7 @@ public class WildcardsRemover {
       paginationController.consume(pair.left.size(), pair.right);
       return pair.left;
     } catch (Exception e) {
+      e.printStackTrace();
       throw new StatementAnalyzeException(
           "error occurred when removing wildcard: " + e.getMessage());
     }
@@ -292,7 +292,7 @@ public class WildcardsRemover {
     List<List<Expression>> extendedExpressions = new ArrayList<>();
     for (Expression originExpression : expressions) {
       List<Expression> actualExpressions = new ArrayList<>();
-      originExpression.removeWildcards(new WildcardsRemover(), actualExpressions);
+      originExpression.removeWildcards(this, actualExpressions);
       if (actualExpressions.isEmpty()) {
         // Let's ignore the eval of the function which has at least one non-existence series as
         // input. See IOTDB-1212: https://github.com/apache/iotdb/pull/3101
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
index 69a52ef24d..77c796ce0a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
@@ -73,6 +73,34 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(queryStatement, context);
   }
 
+  public R visitAggregationQuery(AggregationQueryStatement queryStatement, C context) {
+    return visitQuery(queryStatement, context);
+  }
+
+  public R visitFillQuery(FillQueryStatement queryStatement, C context) {
+    return visitQuery(queryStatement, context);
+  }
+
+  public R visitGroupByQuery(GroupByQueryStatement queryStatement, C context) {
+    return visitQuery(queryStatement, context);
+  }
+
+  public R visitGroupByFillQuery(GroupByFillQueryStatement queryStatement, C context) {
+    return visitQuery(queryStatement, context);
+  }
+
+  public R visitLastQuery(LastQueryStatement queryStatement, C context) {
+    return visitQuery(queryStatement, context);
+  }
+
+  public R visitUDTFQuery(UDTFQueryStatement queryStatement, C context) {
+    return visitQuery(queryStatement, context);
+  }
+
+  public R visitUDAFQuery(UDAFQueryStatement queryStatement, C context) {
+    return visitQuery(queryStatement, context);
+  }
+
   // Insert Statement
   public R visitInsert(InsertStatement insertStatement, C context) {
     return visitStatement(insertStatement, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelComponent.java
index 5b91a7bba4..6d02efdee4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelComponent.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.sql.statement.component;
 
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.statement.StatementNode;
 
 import java.util.Map;
@@ -48,4 +49,8 @@ public class GroupByLevelComponent extends StatementNode {
   public Map<String, String> getGroupedPathMap() {
     return groupByLevelController.getGroupedPathMap();
   }
+
+  public Map<ColumnHeader, ColumnHeader> getGroupedHeaderMap() {
+    return groupByLevelController.getGroupedHeaderMap();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelController.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelController.java
index f00b67dab9..aec0d007ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelController.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelController.java
@@ -22,11 +22,13 @@ package org.apache.iotdb.db.mpp.sql.statement.component;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.statement.crud.AggregationQueryStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.*;
 
@@ -49,6 +51,8 @@ public class GroupByLevelController {
   int prevSize = 0;
   /** count(root.sg.d1.s1) with level = 1 -> count(root.*.d1.s1) */
   private final Map<String, String> groupedPathMap;
+
+  private final Map<ColumnHeader, ColumnHeader> groupedHeaderMap;
   /** count(root.*.d1.s1) -> alias */
   private Map<String, String> columnToAliasMap;
   /**
@@ -63,6 +67,7 @@ public class GroupByLevelController {
     this.limitPaths = seriesLimit > 0 ? new HashSet<>() : null;
     this.offsetPaths = seriesOffset > 0 ? new HashSet<>() : null;
     this.groupedPathMap = new LinkedHashMap<>();
+    this.groupedHeaderMap = new LinkedHashMap<>();
     this.levels =
         ((AggregationQueryStatement) queryStatement).getGroupByLevelComponent().getLevels();
   }
@@ -103,11 +108,17 @@ public class GroupByLevelController {
           boolean isCountStar = countWildcardIterIndices.contains(idx++);
           String groupedPath =
               generatePartialPathByLevel(isCountStar, paths.get(0).getNodes(), levels);
+          TSDataType dataType = paths.get(0).getSeriesType();
           String rawPath = String.format("%s(%s)", functionName, paths.get(0).getFullPath());
           String pathWithFunction = String.format("%s(%s)", functionName, groupedPath);
 
+          ColumnHeader rawPathHeader =
+              new ColumnHeader(paths.get(0).getFullPath(), functionName, dataType);
+          ColumnHeader groupedPathHeader = new ColumnHeader(groupedPath, functionName, dataType);
+
           if (seriesLimit == 0 && seriesOffset == 0) {
             groupedPathMap.put(rawPath, pathWithFunction);
+            groupedHeaderMap.put(rawPathHeader, groupedPathHeader);
             checkAliasAndUpdateAliasMap(rawColumn, pathWithFunction);
           } else {
             // We cannot judge whether the path after grouping exists until we add it to set
@@ -126,6 +137,7 @@ public class GroupByLevelController {
                 limitPaths.remove(pathWithFunction);
               } else {
                 groupedPathMap.put(rawPath, pathWithFunction);
+                groupedHeaderMap.put(rawPathHeader, groupedPathHeader);
                 checkAliasAndUpdateAliasMap(rawColumn, pathWithFunction);
               }
             } else {
@@ -222,4 +234,8 @@ public class GroupByLevelController {
   public Map<String, String> getGroupedPathMap() {
     return groupedPathMap;
   }
+
+  public Map<ColumnHeader, ColumnHeader> getGroupedHeaderMap() {
+    return groupedHeaderMap;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/SelectComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/SelectComponent.java
index 5662881af4..b81b3244e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/SelectComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/SelectComponent.java
@@ -24,9 +24,15 @@ import org.apache.iotdb.db.mpp.sql.statement.StatementNode;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
+import org.apache.iotdb.tsfile.read.common.Path;
 
 import java.time.ZoneId;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /** This class maintains information of {@code SELECT} clause. */
 public class SelectComponent extends StatementNode {
@@ -148,4 +154,12 @@ public class SelectComponent extends StatementNode {
     }
     return deviceIdToPathsCache;
   }
+
+  public List<Path> getDeduplicatedPaths() {
+    Set<Path> deduplicatedPaths = new HashSet<>();
+    for (ResultColumn resultColumn : resultColumns) {
+      deduplicatedPaths.addAll(resultColumn.collectPaths());
+    }
+    return new ArrayList<>(deduplicatedPaths);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/AggregationQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/AggregationQueryStatement.java
index 6276f2f31f..1532c07a46 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/AggregationQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/AggregationQueryStatement.java
@@ -20,13 +20,21 @@
 package org.apache.iotdb.db.mpp.sql.statement.crud;
 
 import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.GroupByLevelComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.ResultColumn;
 import org.apache.iotdb.db.mpp.sql.statement.component.SelectComponent;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 public class AggregationQueryStatement extends QueryStatement {
 
   // GROUP BY LEVEL clause
@@ -53,11 +61,26 @@ public class AggregationQueryStatement extends QueryStatement {
     return groupByLevelComponent != null && groupByLevelComponent.getLevels().length > 0;
   }
 
+  public Map<String, Map<PartialPath, Set<AggregationType>>> getDeviceNameToAggregationsMap() {
+    Map<String, Map<PartialPath, Set<AggregationType>>> deviceNameToAggregationsMap =
+        new HashMap<>();
+    for (ResultColumn resultColumn : getSelectComponent().getResultColumns()) {
+      FunctionExpression expression = (FunctionExpression) resultColumn.getExpression();
+      PartialPath path = expression.getPaths().get(0);
+      String functionName = expression.getFunctionName();
+      deviceNameToAggregationsMap
+          .computeIfAbsent(path.getDevice(), key -> new HashMap<>())
+          .computeIfAbsent(path, key -> new HashSet<>())
+          .add(AggregationType.valueOf(functionName.toUpperCase()));
+    }
+    return deviceNameToAggregationsMap;
+  }
+
   @Override
   public void selfCheck() {
     super.selfCheck();
 
-    if (!DisableAlign()) {
+    if (disableAlign()) {
       throw new SemanticException("AGGREGATION doesn't support disable align clause.");
     }
     checkSelectComponent(selectComponent);
@@ -87,4 +110,8 @@ public class AggregationQueryStatement extends QueryStatement {
       }
     }
   }
+
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitAggregationQuery(this, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/FillQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/FillQueryStatement.java
index 8e693d174f..da72b8e1d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/FillQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/FillQueryStatement.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.sql.statement.crud;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
 import org.apache.iotdb.db.mpp.sql.constant.FilterConstant;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.FillComponent;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 
@@ -47,7 +48,7 @@ public class FillQueryStatement extends QueryStatement {
   public void selfCheck() {
     super.selfCheck();
 
-    if (DisableAlign()) {
+    if (disableAlign()) {
       throw new SemanticException("FILL doesn't support disable align clause.");
     }
 
@@ -70,4 +71,8 @@ public class FillQueryStatement extends QueryStatement {
       throw new SemanticException("Slice query must select a single time point");
     }
   }
+
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitFillQuery(this, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/GroupByFillQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/GroupByFillQueryStatement.java
index 1932e2dc77..b3d87aa5dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/GroupByFillQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/GroupByFillQueryStatement.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.sql.statement.crud;
 
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.FillComponent;
 
 public class GroupByFillQueryStatement extends GroupByQueryStatement {
@@ -40,4 +41,8 @@ public class GroupByFillQueryStatement extends GroupByQueryStatement {
   public void setFillComponent(FillComponent fillComponent) {
     this.fillComponent = fillComponent;
   }
+
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitGroupByFillQuery(this, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/GroupByQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/GroupByQueryStatement.java
index f778dede0d..73191009d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/GroupByQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/GroupByQueryStatement.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.sql.statement.crud;
 
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
 
 public class GroupByQueryStatement extends AggregationQueryStatement {
@@ -40,4 +41,8 @@ public class GroupByQueryStatement extends AggregationQueryStatement {
   public void setGroupByTimeComponent(GroupByTimeComponent groupByTimeComponent) {
     this.groupByTimeComponent = groupByTimeComponent;
   }
+
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitGroupByQuery(this, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/LastQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/LastQueryStatement.java
index a541f6db03..0faa99a2c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/LastQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/LastQueryStatement.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.sql.statement.crud;
 
 import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.ResultColumn;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
@@ -42,7 +43,7 @@ public class LastQueryStatement extends QueryStatement {
       throw new SemanticException("Last query doesn't support align by device.");
     }
 
-    if (DisableAlign()) {
+    if (disableAlign()) {
       throw new SemanticException("Disable align cannot be applied to LAST query.");
     }
 
@@ -53,4 +54,8 @@ public class LastQueryStatement extends QueryStatement {
       }
     }
   }
+
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitLastQuery(this, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/QueryStatement.java
index 9b4336a392..3aa83a7c0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/QueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/QueryStatement.java
@@ -25,15 +25,11 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.constant.StatementType;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.FromComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
-import org.apache.iotdb.db.mpp.sql.statement.component.ResultSetFormat;
-import org.apache.iotdb.db.mpp.sql.statement.component.SelectComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.WhereCondition;
+import org.apache.iotdb.db.mpp.sql.statement.component.*;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
+import java.util.stream.Collectors;
 
 /**
  * Base class of SELECT statement.
@@ -235,8 +231,8 @@ public class QueryStatement extends Statement {
     return resultSetFormat == ResultSetFormat.ALIGN_BY_DEVICE;
   }
 
-  public boolean DisableAlign() {
-    return resultSetFormat != ResultSetFormat.DISABLE_ALIGN;
+  public boolean disableAlign() {
+    return resultSetFormat == ResultSetFormat.DISABLE_ALIGN;
   }
 
   public boolean hasTimeSeriesGeneratingFunction() {
@@ -247,6 +243,31 @@ public class QueryStatement extends Statement {
     return selectComponent.isHasUserDefinedAggregationFunction();
   }
 
+  public Map<String, Set<PartialPath>> getDeviceNameToPathsMap() {
+    Map<String, Set<PartialPath>> deviceNameToPathsMap =
+        new HashMap<>(getSelectComponent().getDeviceIdToPathsMap());
+    if (getWhereCondition() != null) {
+      for (PartialPath path :
+          getWhereCondition().getQueryFilter().getPathSet().stream()
+              .filter(SQLConstant::isNotReservedPath)
+              .collect(Collectors.toList())) {
+        deviceNameToPathsMap.computeIfAbsent(path.getDevice(), k -> new HashSet<>()).add(path);
+      }
+    }
+    return deviceNameToPathsMap;
+  }
+
+  public List<String> getSelectedPathNames() {
+    Set<String> pathSet = new HashSet<>();
+    for (ResultColumn resultColumn : getSelectComponent().getResultColumns()) {
+      pathSet.addAll(
+          resultColumn.collectPaths().stream()
+              .map(PartialPath::getFullPath)
+              .collect(Collectors.toList()));
+    }
+    return new ArrayList<>(pathSet);
+  }
+
   /** semantic check */
   public void selfCheck() {
     if (isAlignByDevice()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDAFQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDAFQueryStatement.java
index d2475e1e35..555e15eeec 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDAFQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDAFQueryStatement.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.sql.statement.crud;
 
 import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.ResultColumn;
 import org.apache.iotdb.db.mpp.sql.statement.component.SelectComponent;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -48,7 +49,7 @@ public class UDAFQueryStatement extends QueryStatement {
   public void selfCheck() {
     super.selfCheck();
 
-    if (!DisableAlign()) {
+    if (!disableAlign()) {
       throw new SemanticException("AGGREGATION doesn't support disable align clause.");
     }
     checkSelectComponent(selectComponent);
@@ -88,4 +89,8 @@ public class UDAFQueryStatement extends QueryStatement {
       checkEachExpression(childExp);
     }
   }
+
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitUDAFQuery(this, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDTFQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDTFQueryStatement.java
index 79d3c75efa..b2fb9166ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDTFQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDTFQueryStatement.java
@@ -19,9 +19,15 @@
 
 package org.apache.iotdb.db.mpp.sql.statement.crud;
 
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+
 public class UDTFQueryStatement extends QueryStatement {
 
   public UDTFQueryStatement(QueryStatement queryStatement) {
     super(queryStatement);
   }
+
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitUDTFQuery(this, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index cbae289964..9ca03a20b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -292,6 +292,10 @@ public class SQLConstant {
     return pathStr.equals(TIME_PATH);
   }
 
+  public static boolean isNotReservedPath(PartialPath pathStr) {
+    return !pathStr.equals(TIME_PATH);
+  }
+
   public static Set<String> getNativeFunctionNames() {
     return NATIVE_FUNCTION_NAMES;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index dfdd47fbd1..fc6a1d6e0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.unary.ConstantOperand;
@@ -85,8 +83,6 @@ public abstract class Expression {
   public abstract void constructUdfExecutors(
       Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId);
 
-  public abstract void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId);
-
   public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner);
 
   public abstract IntermediateLayer constructIntermediateLayer(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 4e482905ca..b706a52f5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -200,11 +198,6 @@ public abstract class BinaryExpression extends Expression {
     rightExpression.collectPaths(pathSet);
   }
 
-  @Override
-  public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
-    // TODO: support nested expressions
-  }
-
   @Override
   public void constructUdfExecutors(
       Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
index 1cc9043ceb..0d32cfbad5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
@@ -22,8 +22,6 @@ package org.apache.iotdb.db.query.expression.unary;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -104,11 +102,6 @@ public class ConstantOperand extends Expression {
     // Do nothing
   }
 
-  @Override
-  public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
-    // Do nothing
-  }
-
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
     // Do nothing
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index ef77e955a5..564ce5bb09 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -25,9 +25,6 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -235,14 +232,6 @@ public class FunctionExpression extends Expression {
     }
   }
 
-  @Override
-  public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
-    if (isBuiltInAggregationFunctionExpression) {
-      planNodeSet.add(new SeriesAggregateScanNode(nodeId, this));
-    }
-    // TODO: support UDF
-  }
-
   @Override
   public void constructUdfExecutors(
       Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
index b1183df457..66893a0172 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -129,11 +127,6 @@ public class LogicNotExpression extends Expression {
     expression.constructUdfExecutors(expressionName2Executor, zoneId);
   }
 
-  @Override
-  public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
-    // TODO: support LogicNotExpression
-  }
-
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
     expression.updateStatisticsForMemoryAssigner(memoryAssigner);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index fca985cf79..04065daa55 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -128,11 +126,6 @@ public class NegationExpression extends Expression {
     expression.collectPaths(pathSet);
   }
 
-  @Override
-  public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
-    // TODO: support nested expressions
-  }
-
   @Override
   public void constructUdfExecutors(
       Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index f33d353ce1..a9c1052c01 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -24,9 +24,6 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -113,11 +110,6 @@ public class TimeSeriesOperand extends Expression {
     pathSet.add(path);
   }
 
-  @Override
-  public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
-    planNodeSet.add(new SeriesScanNode(nodeId, path));
-  }
-
   @Override
   public void constructUdfExecutors(
       Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index b75bb0f0c0..5b553dc73f 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -171,7 +171,7 @@ public class SchemaUtils {
     return tsDataTypes;
   }
 
-  public static TSDataType getSeriesTypeByPath(MeasurementPath path, String aggregation) {
+  public static TSDataType getSeriesTypeByPath(PartialPath path, String aggregation) {
     TSDataType dataType = getAggregationType(aggregation);
     if (dataType != null) {
       return dataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index c28e07ae85..7ad38eb34f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
 import org.apache.iotdb.commons.partition.TimePartitionSlot;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
@@ -40,9 +40,10 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import com.google.common.collect.Sets;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -59,18 +60,28 @@ public class DistributionPlannerTest {
   public void TestRewriteSourceNode() throws IllegalPathException {
     QueryId queryId = new QueryId("test_query");
 
-    TimeJoinNode timeJoinNode =
-        new TimeJoinNode(
-            queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
 
     timeJoinNode.addChild(
-        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s1")));
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_ASC));
     timeJoinNode.addChild(
-        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s2")));
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_ASC));
     timeJoinNode.addChild(
-        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d22.s1")));
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
+            Sets.newHashSet("s1"),
+            OrderBy.TIMESTAMP_ASC));
 
-    LimitNode root = new LimitNode(queryId.genPlanNodeId(), 10, timeJoinNode);
+    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
 
     Analysis analysis = constructAnalysis();
 
@@ -83,18 +94,28 @@ public class DistributionPlannerTest {
   @Test
   public void TestAddExchangeNode() throws IllegalPathException {
     QueryId queryId = new QueryId("test_query");
-    TimeJoinNode timeJoinNode =
-        new TimeJoinNode(
-            queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
 
     timeJoinNode.addChild(
-        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s1")));
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_ASC));
     timeJoinNode.addChild(
-        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s2")));
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_ASC));
     timeJoinNode.addChild(
-        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d22.s1")));
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
+            Sets.newHashSet("s1"),
+            OrderBy.TIMESTAMP_ASC));
 
-    LimitNode root = new LimitNode(queryId.genPlanNodeId(), 10, timeJoinNode);
+    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
 
     Analysis analysis = constructAnalysis();
 
@@ -108,18 +129,28 @@ public class DistributionPlannerTest {
   @Test
   public void TestSplitFragment() throws IllegalPathException {
     QueryId queryId = new QueryId("test_query");
-    TimeJoinNode timeJoinNode =
-        new TimeJoinNode(
-            queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
 
     timeJoinNode.addChild(
-        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s1")));
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_ASC));
     timeJoinNode.addChild(
-        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s2")));
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_ASC));
     timeJoinNode.addChild(
-        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d22.s1")));
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
+            Sets.newHashSet("s1"),
+            OrderBy.TIMESTAMP_ASC));
 
-    LimitNode root = new LimitNode(queryId.genPlanNodeId(), 10, timeJoinNode);
+    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
 
     Analysis analysis = constructAnalysis();
 
@@ -136,18 +167,28 @@ public class DistributionPlannerTest {
   @Test
   public void TestParallelPlan() throws IllegalPathException {
     QueryId queryId = new QueryId("test_query");
-    TimeJoinNode timeJoinNode =
-        new TimeJoinNode(
-            queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
 
     timeJoinNode.addChild(
-        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s1")));
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_ASC));
     timeJoinNode.addChild(
-        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s2")));
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_ASC));
     timeJoinNode.addChild(
-        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d333.s1")));
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d333.s1", TSDataType.INT32),
+            Sets.newHashSet("s1"),
+            OrderBy.TIMESTAMP_ASC));
 
-    LimitNode root = new LimitNode(queryId.genPlanNodeId(), 10, timeJoinNode);
+    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
 
     Analysis analysis = constructAnalysis();
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
index 87bea82d7a..baef5cba99 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -41,8 +42,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -51,68 +50,17 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.iotdb.db.mpp.sql.plan.QueryLogicalPlanUtil.querySQLs;
+import static org.apache.iotdb.db.mpp.sql.plan.QueryLogicalPlanUtil.sqlToPlanMap;
 import static org.junit.Assert.fail;
 
 public class LogicalPlannerTest {
 
-  LogicalPlanPrinter planPrinter = new LogicalPlanPrinter();
-
-  @Before
-  public void setUp() {}
-
-  @Test
-  @Ignore
-  public void rawDataQueryTest() {
-    PlanNode root =
-        parseSQLToPlanNode(
-            "SELECT s1,s2 FROM root.sg1.d1 WHERE time > 10 and s2 > 100 WITHOUT NULL ANY(s1) LIMIT 1 OFFSET 10");
-    System.out.println(planPrinter.print(root));
-    // TODO: replace all paths to full paths
-    Assert.assertEquals(
-        "[OffsetNode (7)]\n"
-            + " │   RowOffset: 10\n"
-            + " └─[LimitNode (6)]\n"
-            + "    │   RowLimit: 1\n"
-            + "    └─[FilterNullNode (5)]\n"
-            + "       │   FilterNullPolicy: CONTAINS_NULL\n"
-            + "       │   FilterNullColumnNames: [s1]\n"
-            + "       └─[FilterNode (4)]\n"
-            + "          │   QueryFilter: [and [time>10][s2>100]]\n"
-            + "          └─[TimeJoinNode (3)]\n"
-            + "             │   MergeOrder: TIMESTAMP_ASC\n"
-            + "             │   FilterNullPolicy: null\n"
-            + "             └─[SeriesScanNode (1)]\n"
-            + "                │   SeriesPath: s1\n"
-            + "                │   scanOrder: TIMESTAMP_ASC\n"
-            + "               [SeriesScanNode (2)]\n"
-            + "                │   SeriesPath: s2\n"
-            + "                │   scanOrder: TIMESTAMP_ASC\n",
-        planPrinter.print(root));
-  }
-
   @Test
-  @Ignore
-  public void aggregationQueryTest() {
-    PlanNode root =
-        parseSQLToPlanNode(
-            "SELECT sum(s1), avg(s2) FROM root.sg1.d1 WHERE time > 10 LIMIT 1 OFFSET 10");
-    System.out.println(planPrinter.print(root));
-    // TODO: replace all paths to full paths
-    Assert.assertEquals(
-        "[OffsetNode (6)]\n"
-            + " │   RowOffset: 10\n"
-            + " └─[LimitNode (5)]\n"
-            + "    │   RowLimit: 1\n"
-            + "    └─[FilterNode (4)]\n"
-            + "       │   QueryFilter: [time>10]\n"
-            + "       └─[TimeJoinNode (3)]\n"
-            + "          │   MergeOrder: TIMESTAMP_ASC\n"
-            + "          │   FilterNullPolicy: null\n"
-            + "          └─[SeriesAggregateScanNode (2)]\n"
-            + "             │   AggregateFunction: avg(s2)\n"
-            + "            [SeriesAggregateScanNode (1)]\n"
-            + "             │   AggregateFunction: sum(s1)\n",
-        planPrinter.print(root));
+  public void queryPlanTest() {
+    for (String sql : querySQLs) {
+      Assert.assertEquals(sqlToPlanMap.get(sql), parseSQLToPlanNode(sql));
+    }
   }
 
   @Test
@@ -418,26 +366,6 @@ public class LogicalPlannerTest {
     }
   }
 
-  private PlanNode parseSQLToPlanNode(String sql) {
-    PlanNode planNode = null;
-    try {
-      Statement statement =
-          StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
-      MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
-      // TODO: do analyze after implementing ISchemaFetcher and IPartitionFetcher
-      //      Analyzer analyzer = new Analyzer(context);
-      //      Analysis analysis = analyzer.analyze(statement);
-      Analysis analysis = new Analysis();
-      analysis.setStatement(statement);
-      LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
-      planNode = planner.plan(analysis).getRootNode();
-    } catch (Exception e) {
-      e.printStackTrace();
-      fail();
-    }
-    return planNode;
-  }
-
   @Test
   public void authorTest() throws AuthException {
 
@@ -583,4 +511,21 @@ public class LogicalPlannerTest {
     Assert.assertNotNull(authorNode);
     Assert.assertEquals(AuthorOperator.AuthorType.LIST_ROLE_USERS, authorNode.getAuthorType());
   }
+
+  private PlanNode parseSQLToPlanNode(String sql) {
+    PlanNode planNode = null;
+    try {
+      Statement statement =
+          StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
+      MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
+      Analyzer analyzer = new Analyzer(context);
+      Analysis analysis = analyzer.analyze(statement);
+      LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+      planNode = planner.plan(analysis).getRootNode();
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+    return planNode;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryLogicalPlanUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryLogicalPlanUtil.java
new file mode 100644
index 0000000000..97ab163c68
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryLogicalPlanUtil.java
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.ColumnHeader;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.ValueFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+
+import org.apache.commons.compress.utils.Sets;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** This class generates logical plans for test cases of the query statements. */
+public class QueryLogicalPlanUtil {
+
+  // test cases of query statement
+  public static final List<String> querySQLs = new ArrayList<>();
+
+  // key: query statement; value: expected logical plan
+  public static final Map<String, PlanNode> sqlToPlanMap = new HashMap<>();
+
+  public static final Map<String, MeasurementPath> schemaMap = new HashMap<>();
+
+  static {
+    try {
+      schemaMap.put("root.sg.d1.s1", new MeasurementPath("root.sg.d1.s1", TSDataType.INT32));
+      schemaMap.put("root.sg.d1.s2", new MeasurementPath("root.sg.d1.s2", TSDataType.INT32));
+      schemaMap.put("root.sg.d2.s1", new MeasurementPath("root.sg.d2.s1", TSDataType.INT32));
+      schemaMap.put("root.sg.d2.s2", new MeasurementPath("root.sg.d2.s2", TSDataType.INT32));
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /* 1. Raw Data Query */
+  static {
+    String sql =
+        "SELECT s1 FROM root.sg.* WHERE time > 100 and s2 > 10 "
+            + "ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100 SLIMIT 1 SOFFSET 1";
+
+    List<PlanNode> sourceNodeList = new ArrayList<>();
+    sourceNodeList.add(
+        new SeriesScanNode(
+            new PlanNodeId("test_query_0"),
+            schemaMap.get("root.sg.d1.s2"),
+            Sets.newHashSet("s2"),
+            OrderBy.TIMESTAMP_DESC));
+    sourceNodeList.add(
+        new SeriesScanNode(
+            new PlanNodeId("test_query_1"),
+            schemaMap.get("root.sg.d2.s1"),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_DESC));
+    sourceNodeList.add(
+        new SeriesScanNode(
+            new PlanNodeId("test_query_2"),
+            schemaMap.get("root.sg.d2.s2"),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_DESC));
+
+    TimeJoinNode timeJoinNode =
+        new TimeJoinNode(new PlanNodeId("test_query_3"), OrderBy.TIMESTAMP_DESC, sourceNodeList);
+
+    IExpression leftExpression =
+        new SingleSeriesExpression(
+            schemaMap.get("root.sg.d1.s2"),
+            FilterFactory.and(ValueFilter.gt(10), TimeFilter.gt(100)));
+    IExpression rightExpression =
+        new SingleSeriesExpression(
+            schemaMap.get("root.sg.d2.s2"),
+            FilterFactory.and(ValueFilter.gt(10), TimeFilter.gt(100)));
+    IExpression expression = BinaryExpression.and(leftExpression, rightExpression);
+
+    FilterNode filterNode =
+        new FilterNode(
+            new PlanNodeId("test_query_4"),
+            timeJoinNode,
+            expression,
+            Collections.singletonList("root.sg.d2.s1"));
+
+    FilterNullNode filterNullNode =
+        new FilterNullNode(
+            new PlanNodeId("test_query_5"),
+            filterNode,
+            FilterNullPolicy.CONTAINS_NULL,
+            new ArrayList<>());
+
+    LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_6"), filterNullNode, 100);
+    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_7"), limitNode, 100);
+
+    querySQLs.add(sql);
+    sqlToPlanMap.put(sql, offsetNode);
+  }
+
+  /* 2. Raw Data Query (align by device) */
+  static {
+    String sql =
+        "SELECT * FROM root.sg.* WHERE time > 100 and s1 > 10 "
+            + "ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+
+    List<PlanNode> sourceNodeList1 = new ArrayList<>();
+    List<PlanNode> sourceNodeList2 = new ArrayList<>();
+    sourceNodeList1.add(
+        new SeriesScanNode(
+            new PlanNodeId("test_query_0"),
+            schemaMap.get("root.sg.d1.s1"),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_DESC));
+    sourceNodeList1.add(
+        new SeriesScanNode(
+            new PlanNodeId("test_query_1"),
+            schemaMap.get("root.sg.d1.s2"),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_DESC));
+    sourceNodeList2.add(
+        new SeriesScanNode(
+            new PlanNodeId("test_query_2"),
+            schemaMap.get("root.sg.d2.s1"),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_DESC));
+    sourceNodeList2.add(
+        new SeriesScanNode(
+            new PlanNodeId("test_query_3"),
+            schemaMap.get("root.sg.d2.s2"),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_DESC));
+
+    TimeJoinNode timeJoinNode1 =
+        new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
+    TimeJoinNode timeJoinNode2 =
+        new TimeJoinNode(new PlanNodeId("test_query_5"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+
+    IExpression leftExpression =
+        new SingleSeriesExpression(
+            schemaMap.get("root.sg.d1.s1"),
+            FilterFactory.and(ValueFilter.gt(10), TimeFilter.gt(100)));
+    IExpression rightExpression =
+        new SingleSeriesExpression(
+            schemaMap.get("root.sg.d2.s1"),
+            FilterFactory.and(ValueFilter.gt(10), TimeFilter.gt(100)));
+    IExpression expression = BinaryExpression.and(leftExpression, rightExpression);
+
+    List<String> outputColumnNames =
+        Arrays.asList("root.sg.d1.s1", "root.sg.d1.s2", "root.sg.d2.s1", "root.sg.d2.s2");
+    FilterNode filterNode1 =
+        new FilterNode(
+            new PlanNodeId("test_query_6"), timeJoinNode1, expression, outputColumnNames);
+    FilterNode filterNode2 =
+        new FilterNode(
+            new PlanNodeId("test_query_7"), timeJoinNode2, expression, outputColumnNames);
+
+    DeviceMergeNode deviceMergeNode =
+        new DeviceMergeNode(new PlanNodeId("test_query_8"), OrderBy.TIMESTAMP_DESC);
+    deviceMergeNode.addChildDeviceNode("root.sg.d1", filterNode1);
+    deviceMergeNode.addChildDeviceNode("root.sg.d2", filterNode2);
+
+    FilterNullNode filterNullNode =
+        new FilterNullNode(
+            new PlanNodeId("test_query_9"),
+            deviceMergeNode,
+            FilterNullPolicy.CONTAINS_NULL,
+            new ArrayList<>());
+
+    LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_10"), filterNullNode, 100);
+    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_11"), limitNode, 100);
+
+    querySQLs.add(sql);
+    sqlToPlanMap.put(sql, offsetNode);
+  }
+
+  /* 3. Aggregation Query (without value filter) */
+  static {
+    String sql =
+        "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 "
+            + "GROUP BY LEVEL = 1 ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100";
+
+    List<PlanNode> sourceNodeList = new ArrayList<>();
+    List<AggregationType> aggregationTypeList1 =
+        Arrays.asList(AggregationType.COUNT, AggregationType.LAST_VALUE);
+    List<AggregationType> aggregationTypeList2 =
+        Collections.singletonList(AggregationType.MAX_VALUE);
+    Filter timeFilter = TimeFilter.gt(100);
+    sourceNodeList.add(
+        new SeriesAggregateScanNode(
+            new PlanNodeId("test_query_0"),
+            schemaMap.get("root.sg.d1.s1"),
+            aggregationTypeList1,
+            OrderBy.TIMESTAMP_DESC,
+            timeFilter,
+            null));
+    sourceNodeList.add(
+        new SeriesAggregateScanNode(
+            new PlanNodeId("test_query_1"),
+            schemaMap.get("root.sg.d1.s2"),
+            aggregationTypeList2,
+            OrderBy.TIMESTAMP_DESC,
+            timeFilter,
+            null));
+    sourceNodeList.add(
+        new SeriesAggregateScanNode(
+            new PlanNodeId("test_query_2"),
+            schemaMap.get("root.sg.d2.s1"),
+            aggregationTypeList1,
+            OrderBy.TIMESTAMP_DESC,
+            timeFilter,
+            null));
+    sourceNodeList.add(
+        new SeriesAggregateScanNode(
+            new PlanNodeId("test_query_3"),
+            schemaMap.get("root.sg.d2.s2"),
+            aggregationTypeList2,
+            OrderBy.TIMESTAMP_DESC,
+            timeFilter,
+            null));
+
+    TimeJoinNode timeJoinNode =
+        new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList);
+
+    int[] groupByLevels = new int[] {1};
+    Map<ColumnHeader, ColumnHeader> groupedHeaderMap = new HashMap<>();
+    groupedHeaderMap.put(
+        new ColumnHeader("root.sg.d1.s1", AggregationType.COUNT.name(), TSDataType.INT32),
+        new ColumnHeader("root.sg.*.s1", AggregationType.COUNT.name(), TSDataType.INT32));
+    groupedHeaderMap.put(
+        new ColumnHeader("root.sg.d2.s1", AggregationType.COUNT.name(), TSDataType.INT32),
+        new ColumnHeader("root.sg.*.s1", AggregationType.COUNT.name(), TSDataType.INT32));
+    groupedHeaderMap.put(
+        new ColumnHeader("root.sg.d1.s2", AggregationType.MAX_VALUE.name(), TSDataType.INT32),
+        new ColumnHeader("root.sg.*.s2", AggregationType.MAX_VALUE.name(), TSDataType.INT32));
+    groupedHeaderMap.put(
+        new ColumnHeader("root.sg.d2.s2", AggregationType.MAX_VALUE.name(), TSDataType.INT32),
+        new ColumnHeader("root.sg.*.s2", AggregationType.MAX_VALUE.name(), TSDataType.INT32));
+    groupedHeaderMap.put(
+        new ColumnHeader("root.sg.d1.s1", AggregationType.LAST_VALUE.name(), TSDataType.INT32),
+        new ColumnHeader("root.sg.*.s1", AggregationType.LAST_VALUE.name(), TSDataType.INT32));
+    groupedHeaderMap.put(
+        new ColumnHeader("root.sg.d2.s1", AggregationType.LAST_VALUE.name(), TSDataType.INT32),
+        new ColumnHeader("root.sg.*.s1", AggregationType.LAST_VALUE.name(), TSDataType.INT32));
+    GroupByLevelNode groupByLevelNode =
+        new GroupByLevelNode(
+            new PlanNodeId("test_query_5"), timeJoinNode, groupByLevels, groupedHeaderMap);
+
+    FilterNullNode filterNullNode =
+        new FilterNullNode(
+            new PlanNodeId("test_query_6"),
+            groupByLevelNode,
+            FilterNullPolicy.CONTAINS_NULL,
+            new ArrayList<>());
+
+    LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_7"), filterNullNode, 100);
+    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_8"), limitNode, 100);
+
+    querySQLs.add(sql);
+    sqlToPlanMap.put(sql, offsetNode);
+  }
+
+  /* 4. Aggregation Query (without value filter and align by device) */
+  static {
+    String sql =
+        "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 "
+            + "ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+
+    List<PlanNode> sourceNodeList1 = new ArrayList<>();
+    List<PlanNode> sourceNodeList2 = new ArrayList<>();
+    List<AggregationType> aggregationTypeList1 =
+        Arrays.asList(AggregationType.COUNT, AggregationType.LAST_VALUE);
+    List<AggregationType> aggregationTypeList2 =
+        Collections.singletonList(AggregationType.MAX_VALUE);
+    Filter timeFilter = TimeFilter.gt(100);
+    sourceNodeList1.add(
+        new SeriesAggregateScanNode(
+            new PlanNodeId("test_query_0"),
+            schemaMap.get("root.sg.d1.s1"),
+            aggregationTypeList1,
+            OrderBy.TIMESTAMP_DESC,
+            timeFilter,
+            null));
+    sourceNodeList1.add(
+        new SeriesAggregateScanNode(
+            new PlanNodeId("test_query_1"),
+            schemaMap.get("root.sg.d1.s2"),
+            aggregationTypeList2,
+            OrderBy.TIMESTAMP_DESC,
+            timeFilter,
+            null));
+    sourceNodeList2.add(
+        new SeriesAggregateScanNode(
+            new PlanNodeId("test_query_2"),
+            schemaMap.get("root.sg.d2.s1"),
+            aggregationTypeList1,
+            OrderBy.TIMESTAMP_DESC,
+            timeFilter,
+            null));
+    sourceNodeList2.add(
+        new SeriesAggregateScanNode(
+            new PlanNodeId("test_query_3"),
+            schemaMap.get("root.sg.d2.s2"),
+            aggregationTypeList2,
+            OrderBy.TIMESTAMP_DESC,
+            timeFilter,
+            null));
+
+    TimeJoinNode timeJoinNode1 =
+        new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
+    TimeJoinNode timeJoinNode2 =
+        new TimeJoinNode(new PlanNodeId("test_query_5"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+
+    DeviceMergeNode deviceMergeNode =
+        new DeviceMergeNode(new PlanNodeId("test_query_6"), OrderBy.TIMESTAMP_DESC);
+    deviceMergeNode.addChildDeviceNode("root.sg.d1", timeJoinNode1);
+    deviceMergeNode.addChildDeviceNode("root.sg.d2", timeJoinNode2);
+
+    FilterNullNode filterNullNode =
+        new FilterNullNode(
+            new PlanNodeId("test_query_7"),
+            deviceMergeNode,
+            FilterNullPolicy.CONTAINS_NULL,
+            new ArrayList<>());
+
+    LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_8"), filterNullNode, 100);
+    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_9"), limitNode, 100);
+
+    querySQLs.add(sql);
+    sqlToPlanMap.put(sql, offsetNode);
+  }
+
+  /* 5. Aggregation Query (with value filter) */
+  static {
+    String sql =
+        "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 and s2 > 10 "
+            + "GROUP BY LEVEL = 1 ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100";
+
+    List<PlanNode> sourceNodeList = new ArrayList<>();
+    sourceNodeList.add(
+        new SeriesScanNode(
+            new PlanNodeId("test_query_0"),
+            schemaMap.get("root.sg.d1.s1"),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_DESC));
+    sourceNodeList.add(
+        new SeriesScanNode(
+            new PlanNodeId("test_query_1"),
+            schemaMap.get("root.sg.d1.s2"),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_DESC));
+    sourceNodeList.add(
+        new SeriesScanNode(
+            new PlanNodeId("test_query_2"),
+            schemaMap.get("root.sg.d2.s1"),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_DESC));
+    sourceNodeList.add(
+        new SeriesScanNode(
+            new PlanNodeId("test_query_3"),
+            schemaMap.get("root.sg.d2.s2"),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_DESC));
+
+    TimeJoinNode timeJoinNode =
+        new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList);
+
+    IExpression leftExpression =
+        new SingleSeriesExpression(
+            schemaMap.get("root.sg.d1.s2"),
+            FilterFactory.and(ValueFilter.gt(10), TimeFilter.gt(100)));
+    IExpression rightExpression =
+        new SingleSeriesExpression(
+            schemaMap.get("root.sg.d2.s2"),
+            FilterFactory.and(ValueFilter.gt(10), TimeFilter.gt(100)));
+    IExpression expression = BinaryExpression.and(leftExpression, rightExpression);
+    List<String> outputColumnNames =
+        Arrays.asList("root.sg.d1.s1", "root.sg.d1.s2", "root.sg.d2.s1", "root.sg.d2.s2");
+    FilterNode filterNode =
+        new FilterNode(new PlanNodeId("test_query_5"), timeJoinNode, expression, outputColumnNames);
+
+    Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
+    aggregateFuncMap.put(
+        schemaMap.get("root.sg.d1.s1"),
+        Sets.newHashSet(AggregationType.COUNT, AggregationType.LAST_VALUE));
+    aggregateFuncMap.put(
+        schemaMap.get("root.sg.d1.s2"), Sets.newHashSet(AggregationType.MAX_VALUE));
+    aggregateFuncMap.put(
+        schemaMap.get("root.sg.d2.s1"),
+        Sets.newHashSet(AggregationType.COUNT, AggregationType.LAST_VALUE));
+    aggregateFuncMap.put(
+        schemaMap.get("root.sg.d2.s2"), Sets.newHashSet(AggregationType.MAX_VALUE));
+    AggregateNode aggregateNode =
+        new AggregateNode(new PlanNodeId("test_query_6"), filterNode, aggregateFuncMap, null);
+
+    int[] groupByLevels = new int[] {1};
+    Map<ColumnHeader, ColumnHeader> groupedHeaderMap = new HashMap<>();
+    groupedHeaderMap.put(
+        new ColumnHeader("root.sg.d1.s1", AggregationType.COUNT.name(), TSDataType.INT32),
+        new ColumnHeader("root.sg.*.s1", AggregationType.COUNT.name(), TSDataType.INT32));
+    groupedHeaderMap.put(
+        new ColumnHeader("root.sg.d2.s1", AggregationType.COUNT.name(), TSDataType.INT32),
+        new ColumnHeader("root.sg.*.s1", AggregationType.COUNT.name(), TSDataType.INT32));
+    groupedHeaderMap.put(
+        new ColumnHeader("root.sg.d1.s2", AggregationType.MAX_VALUE.name(), TSDataType.INT32),
+        new ColumnHeader("root.sg.*.s2", AggregationType.MAX_VALUE.name(), TSDataType.INT32));
+    groupedHeaderMap.put(
+        new ColumnHeader("root.sg.d2.s2", AggregationType.MAX_VALUE.name(), TSDataType.INT32),
+        new ColumnHeader("root.sg.*.s2", AggregationType.MAX_VALUE.name(), TSDataType.INT32));
+    groupedHeaderMap.put(
+        new ColumnHeader("root.sg.d1.s1", AggregationType.LAST_VALUE.name(), TSDataType.INT32),
+        new ColumnHeader("root.sg.*.s1", AggregationType.LAST_VALUE.name(), TSDataType.INT32));
+    groupedHeaderMap.put(
+        new ColumnHeader("root.sg.d2.s1", AggregationType.LAST_VALUE.name(), TSDataType.INT32),
+        new ColumnHeader("root.sg.*.s1", AggregationType.LAST_VALUE.name(), TSDataType.INT32));
+    GroupByLevelNode groupByLevelNode =
+        new GroupByLevelNode(
+            new PlanNodeId("test_query_5"), aggregateNode, groupByLevels, groupedHeaderMap);
+
+    FilterNullNode filterNullNode =
+        new FilterNullNode(
+            new PlanNodeId("test_query_7"),
+            groupByLevelNode,
+            FilterNullPolicy.CONTAINS_NULL,
+            new ArrayList<>());
+
+    LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_8"), filterNullNode, 100);
+    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_9"), limitNode, 100);
+
+    querySQLs.add(sql);
+    sqlToPlanMap.put(sql, offsetNode);
+  }
+
+  /* 6. Aggregation Query (with value filter and align by device) */
+  static {
+    String sql =
+        "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 and s2 > 10 "
+            + "ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+
+    List<PlanNode> sourceNodeList1 = new ArrayList<>();
+    List<PlanNode> sourceNodeList2 = new ArrayList<>();
+    sourceNodeList1.add(
+        new SeriesScanNode(
+            new PlanNodeId("test_query_0"),
+            schemaMap.get("root.sg.d1.s1"),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_DESC));
+    sourceNodeList1.add(
+        new SeriesScanNode(
+            new PlanNodeId("test_query_1"),
+            schemaMap.get("root.sg.d1.s2"),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_DESC));
+    sourceNodeList2.add(
+        new SeriesScanNode(
+            new PlanNodeId("test_query_2"),
+            schemaMap.get("root.sg.d2.s1"),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_DESC));
+    sourceNodeList2.add(
+        new SeriesScanNode(
+            new PlanNodeId("test_query_3"),
+            schemaMap.get("root.sg.d2.s2"),
+            Sets.newHashSet("s1", "s2"),
+            OrderBy.TIMESTAMP_DESC));
+
+    TimeJoinNode timeJoinNode1 =
+        new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
+    TimeJoinNode timeJoinNode2 =
+        new TimeJoinNode(new PlanNodeId("test_query_5"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+
+    IExpression leftExpression =
+        new SingleSeriesExpression(
+            schemaMap.get("root.sg.d1.s2"),
+            FilterFactory.and(ValueFilter.gt(10), TimeFilter.gt(100)));
+    IExpression rightExpression =
+        new SingleSeriesExpression(
+            schemaMap.get("root.sg.d2.s2"),
+            FilterFactory.and(ValueFilter.gt(10), TimeFilter.gt(100)));
+    IExpression expression = BinaryExpression.and(leftExpression, rightExpression);
+
+    List<String> outputColumnNames =
+        Arrays.asList("root.sg.d1.s1", "root.sg.d1.s2", "root.sg.d2.s1", "root.sg.d2.s2");
+    FilterNode filterNode1 =
+        new FilterNode(
+            new PlanNodeId("test_query_6"), timeJoinNode1, expression, outputColumnNames);
+    FilterNode filterNode2 =
+        new FilterNode(
+            new PlanNodeId("test_query_7"), timeJoinNode2, expression, outputColumnNames);
+
+    Map<PartialPath, Set<AggregationType>> aggregateFuncMap1 = new HashMap<>();
+    Map<PartialPath, Set<AggregationType>> aggregateFuncMap2 = new HashMap<>();
+    aggregateFuncMap1.put(
+        schemaMap.get("root.sg.d1.s1"),
+        Sets.newHashSet(AggregationType.COUNT, AggregationType.LAST_VALUE));
+    aggregateFuncMap1.put(
+        schemaMap.get("root.sg.d1.s2"), Sets.newHashSet(AggregationType.MAX_VALUE));
+    aggregateFuncMap2.put(
+        schemaMap.get("root.sg.d2.s1"),
+        Sets.newHashSet(AggregationType.COUNT, AggregationType.LAST_VALUE));
+    aggregateFuncMap2.put(
+        schemaMap.get("root.sg.d2.s2"), Sets.newHashSet(AggregationType.MAX_VALUE));
+    AggregateNode aggregateNode1 =
+        new AggregateNode(new PlanNodeId("test_query_8"), filterNode1, aggregateFuncMap1, null);
+    AggregateNode aggregateNode2 =
+        new AggregateNode(new PlanNodeId("test_query_9"), filterNode2, aggregateFuncMap2, null);
+
+    DeviceMergeNode deviceMergeNode =
+        new DeviceMergeNode(new PlanNodeId("test_query_10"), OrderBy.TIMESTAMP_DESC);
+    deviceMergeNode.addChildDeviceNode("root.sg.d1", aggregateNode1);
+    deviceMergeNode.addChildDeviceNode("root.sg.d2", aggregateNode2);
+
+    FilterNullNode filterNullNode =
+        new FilterNullNode(
+            new PlanNodeId("test_query_11"),
+            deviceMergeNode,
+            FilterNullPolicy.CONTAINS_NULL,
+            new ArrayList<>());
+
+    LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_12"), filterNullNode, 100);
+    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_13"), limitNode, 100);
+
+    querySQLs.add(sql);
+    sqlToPlanMap.put(sql, offsetNode);
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java
index 7b660d6570..8ae271f089 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/BinaryExpression.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 public abstract class BinaryExpression implements IBinaryExpression, Serializable {
 
@@ -83,6 +84,25 @@ public abstract class BinaryExpression implements IBinaryExpression, Serializabl
     public String toString() {
       return "[" + left + " && " + right + "]";
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      AndExpression that = (AndExpression) o;
+      return Objects.equals(toString(), that.toString());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(toString());
+    }
   }
 
   protected static class OrExpression extends BinaryExpression {
@@ -129,5 +149,24 @@ public abstract class BinaryExpression implements IBinaryExpression, Serializabl
     public String toString() {
       return "[" + left + " || " + right + "]";
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      OrExpression that = (OrExpression) o;
+      return Objects.equals(toString(), that.toString());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(toString());
+    }
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java
index 3c07c061f0..ac2fd776c0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/GlobalTimeExpression.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.read.expression.IUnaryExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 public class GlobalTimeExpression implements IUnaryExpression, Serializable {
 
@@ -58,4 +59,23 @@ public class GlobalTimeExpression implements IUnaryExpression, Serializable {
   public String toString() {
     return "[" + this.filter + "]";
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    GlobalTimeExpression that = (GlobalTimeExpression) o;
+    return Objects.equals(toString(), that.toString());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(toString());
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java
index f7b603beeb..1b7fb1888c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/impl/SingleSeriesExpression.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.read.expression.IUnaryExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 public class SingleSeriesExpression implements IUnaryExpression, Serializable {
 
@@ -69,4 +70,23 @@ public class SingleSeriesExpression implements IUnaryExpression, Serializable {
   public void setSeriesPath(Path seriesPath) {
     this.seriesPath = seriesPath;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SingleSeriesExpression that = (SingleSeriesExpression) o;
+    return Objects.equals(toString(), that.toString());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(toString());
+  }
 }