You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/12 13:26:03 UTC

[iotdb] branch master updated: [IOTDB-3079] Implememtation of complete LogicalPlanner (#5858)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3eaf0547c3 [IOTDB-3079] Implememtation of complete LogicalPlanner (#5858)
3eaf0547c3 is described below

commit 3eaf0547c35d778f6418615d8ddec51861590d78
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Thu May 12 21:25:58 2022 +0800

    [IOTDB-3079] Implememtation of complete LogicalPlanner (#5858)
---
 .../apache/iotdb/db/metadata/utils/MetaUtils.java  |  30 +
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  26 +-
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java |  59 +-
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  |   1 +
 .../db/mpp/plan/analyze/ExpressionAnalyzer.java    |  16 +
 .../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java |  16 +-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |   8 +-
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    | 433 ++++++++--
 .../iotdb/db/mpp/plan/planner/LogicalPlanner.java  | 160 +++-
 .../planner/plan/node/process/DeviceViewNode.java  |  88 ++-
 .../plan/node/process/GroupByLevelNode.java        |   4 +-
 .../source/AlignedSeriesAggregationScanNode.java   |  11 +-
 .../plan/node/source/AlignedSeriesScanNode.java    |   4 +
 .../node/source/SeriesAggregationScanNode.java     |  52 +-
 .../planner/plan/node/source/SeriesScanNode.java   |  40 +-
 .../plan/parameter/AggregationDescriptor.java      |  10 +-
 .../plan/parameter/GroupByTimeParameter.java       |   8 +
 .../org/apache/iotdb/db/utils/SchemaUtils.java     |  52 ++
 .../db/mpp/plan/plan/DistributionPlannerTest.java  |  15 -
 .../mpp/plan/plan/FragmentInstanceSerdeTest.java   |  16 +-
 .../db/mpp/plan/plan/QueryLogicalPlanUtil.java     | 878 +++++++++++++--------
 .../node/process/AggregationNodeSerdeTest.java     |   1 -
 .../plan/node/process/DeviceViewNodeSerdeTest.java |   4 +-
 .../node/process/GroupByLevelNodeSerdeTest.java    |   3 -
 .../plan/plan/node/process/LimitNodeSerdeTest.java |   2 -
 .../plan/node/process/OffsetNodeSerdeTest.java     |   2 -
 .../plan/plan/node/process/SortNodeSerdeTest.java  |   2 -
 .../plan/node/process/TimeJoinNodeSerdeTest.java   |   3 -
 .../source/SeriesAggregationScanNodeSerdeTest.java |   1 -
 .../plan/node/source/SeriesScanNodeSerdeTest.java  |   2 -
 30 files changed, 1410 insertions(+), 537 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
index e231c174d0..ad2778960c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.metadata.mnode.IMNode;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.tsfile.read.common.Path;
 
 import java.util.ArrayList;
@@ -189,4 +190,33 @@ public class MetaUtils {
     }
     return alignedPathToAggrIndexesMap;
   }
+
+  public static Map<PartialPath, List<AggregationDescriptor>> groupAlignedAggregations(
+      Map<PartialPath, List<AggregationDescriptor>> pathToAggregations) {
+    Map<PartialPath, List<AggregationDescriptor>> result = new HashMap<>();
+    List<AggregationDescriptor> alignedPathAggregations = new ArrayList<>();
+    AlignedPath alignedPath = null;
+    for (PartialPath path : pathToAggregations.keySet()) {
+      MeasurementPath measurementPath = (MeasurementPath) path;
+      if (!measurementPath.isUnderAlignedEntity()) {
+        result
+            .computeIfAbsent(measurementPath, key -> new ArrayList<>())
+            .addAll(pathToAggregations.get(path));
+        alignedPath = null;
+        alignedPathAggregations.clear();
+      } else {
+        if (alignedPath == null || !alignedPath.equals(measurementPath.getDevice())) {
+          alignedPath = new AlignedPath(measurementPath);
+          alignedPathAggregations.addAll(pathToAggregations.get(path));
+        } else {
+          alignedPath.addMeasurement(measurementPath);
+          alignedPathAggregations.addAll(pathToAggregations.get(path));
+        }
+      }
+    }
+    if (alignedPath != null) {
+      result.put(alignedPath, alignedPathAggregations);
+    }
+    return result;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 3c1f8b3b64..892d24cf77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -57,11 +57,11 @@ public class Analysis {
   // map from device name to series/aggregation under this device
   private Map<String, Set<Expression>> sourceExpressions;
 
-  //
+  // expression of output column to be calculated
   private Set<Expression> selectExpressions;
 
   // all aggregations that need to be calculated
-  private Map<String, Set<Expression>> AggregationExpressions;
+  private Map<String, Set<Expression>> aggregationExpressions;
 
   // map from grouped path name to list of input aggregation in `GROUP BY LEVEL` clause
   private Map<Expression, Set<Expression>> groupByLevelExpressions;
@@ -77,17 +77,22 @@ public class Analysis {
 
   private Expression queryFilter;
 
+  // map from device name to query filter under this device (used in ALIGN BY DEVICE)
   private Map<String, Expression> deviceToQueryFilter;
 
   // indicate is there a value filter
   private boolean hasValueFilter = false;
 
-  // a global time filter used in `initQueryDataSource`
+  // a global time filter used in `initQueryDataSource` and filter push down
   private Filter globalTimeFilter;
 
   // header of result dataset
   private DatasetHeader respDatasetHeader;
 
+  // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> [1, 3], s1 is 1 but
+  // not 0 because device is the first column
+  private Map<String, List<Integer>> deviceToMeasurementIndexesMap;
+
   public Analysis() {}
 
   public List<TRegionReplicaSet> getPartitionInfo(PartialPath seriesPath, Filter timefilter) {
@@ -173,11 +178,11 @@ public class Analysis {
   }
 
   public Map<String, Set<Expression>> getAggregationExpressions() {
-    return AggregationExpressions;
+    return aggregationExpressions;
   }
 
   public void setAggregationExpressions(Map<String, Set<Expression>> aggregationExpressions) {
-    AggregationExpressions = aggregationExpressions;
+    this.aggregationExpressions = aggregationExpressions;
   }
 
   public Map<Expression, Set<Expression>> getGroupByLevelExpressions() {
@@ -204,7 +209,7 @@ public class Analysis {
     this.fillDescriptor = fillDescriptor;
   }
 
-  public boolean isHasValueFilter() {
+  public boolean hasValueFilter() {
     return hasValueFilter;
   }
 
@@ -235,4 +240,13 @@ public class Analysis {
   public void setGroupByTimeParameter(GroupByTimeParameter groupByTimeParameter) {
     this.groupByTimeParameter = groupByTimeParameter;
   }
+
+  public void setDeviceToMeasurementIndexesMap(
+      Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
+    this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
+  }
+
+  public Map<String, List<Integer>> getDeviceToMeasurementIndexesMap() {
+    return deviceToMeasurementIndexesMap;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 6217f23185..f52ba6fd77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -79,6 +79,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -144,7 +145,7 @@ public class Analyzer {
         }
 
         List<Pair<Expression, String>> outputExpressions;
-        Set<Expression> selectExpressions = new HashSet<>();
+        Set<Expression> selectExpressions = new LinkedHashSet<>();
         Map<String, Set<Expression>> sourceExpressions = new HashMap<>();
         // Example 1: select s1, s1 + s2 as t, udf(udf(s1)) from root.sg.d1
         //   outputExpressions: [<root.sg.d1.s1,null>, <root.sg.d1.s1 + root.sg.d1.s2,t>,
@@ -175,9 +176,34 @@ public class Analyzer {
         // a set that contains all measurement names,
         Set<String> measurementSet = new HashSet<>();
         if (queryStatement.isAlignByDevice()) {
+          Map<String, Set<String>> deviceToMeasurementsMap = new HashMap<>();
           outputExpressions =
               analyzeFrom(
-                  queryStatement, schemaTree, deviceSchemaInfos, selectExpressions, measurementSet);
+                  queryStatement,
+                  schemaTree,
+                  deviceSchemaInfos,
+                  selectExpressions,
+                  deviceToMeasurementsMap,
+                  measurementSet);
+
+          Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
+          List<String> allMeasurements =
+              outputExpressions.stream()
+                  .map(Pair::getLeft)
+                  .map(Expression::getExpressionString)
+                  .distinct()
+                  .collect(Collectors.toList());
+          for (String deviceName : deviceToMeasurementsMap.keySet()) {
+            List<String> measurementsUnderDeivce =
+                new ArrayList<>(deviceToMeasurementsMap.get(deviceName));
+            List<Integer> indexes = new ArrayList<>();
+            for (String measurement : measurementsUnderDeivce) {
+              indexes.add(
+                  allMeasurements.indexOf(measurement) + 1); // add 1 to skip the device column
+            }
+            deviceToMeasurementIndexesMap.put(deviceName, indexes);
+          }
+          analysis.setDeviceToMeasurementIndexesMap(deviceToMeasurementIndexesMap);
         } else {
           outputExpressions = analyzeSelect(queryStatement, schemaTree);
           selectExpressions =
@@ -329,10 +355,15 @@ public class Analyzer {
         SchemaTree schemaTree,
         List<DeviceSchemaInfo> allDeviceSchemaInfos,
         Set<Expression> selectExpressions,
+        Map<String, Set<String>> deviceToMeasurementsMap,
         Set<String> measurementSet) {
       // device path patterns in FROM clause
       List<PartialPath> devicePatternList = queryStatement.getFromComponent().getPrefixPaths();
 
+      // a list of measurement name with alias (null if alias not exist)
+      List<Pair<Expression, String>> measurementWithAliasList =
+          getAllMeasurements(queryStatement, measurementSet);
+
       // a list contains all selected paths
       List<MeasurementPath> allSelectedPaths = new ArrayList<>();
       for (PartialPath devicePattern : devicePatternList) {
@@ -370,10 +401,6 @@ public class Analyzer {
       // if not, throw a SemanticException
       measurementNameToPathsMap.values().forEach(this::checkDataTypeConsistencyInAlignByDevice);
 
-      // a list of measurement name with alias (null if alias not exist)
-      List<Pair<Expression, String>> measurementWithAliasList =
-          getAllMeasurements(queryStatement, measurementSet);
-
       // apply SLIMIT & SOFFSET and set outputExpressions & selectExpressions
       List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
       ColumnPaginationController paginationController =
@@ -399,6 +426,9 @@ public class Analyzer {
                       measurementAliasPair.left, measurementPath);
               typeProvider.setType(tmpExpression.getExpressionString(), dataType);
               selectExpressions.add(tmpExpression);
+              deviceToMeasurementsMap
+                  .computeIfAbsent(measurementPath.getDevice(), key -> new LinkedHashSet<>())
+                  .add(measurementAliasPair.left.getExpressionString());
             }
             paginationController.consumeLimit();
           } else {
@@ -431,6 +461,9 @@ public class Analyzer {
                       expressionWithoutAlias, measurementPath);
               typeProvider.setType(tmpExpression.getExpressionString(), dataType);
               selectExpressions.add(tmpExpression);
+              deviceToMeasurementsMap
+                  .computeIfAbsent(measurementPath.getDevice(), key -> new LinkedHashSet<>())
+                  .add(expressionWithoutAlias.getExpressionString());
             }
             paginationController.consumeLimit();
           } else {
@@ -459,6 +492,9 @@ public class Analyzer {
                         measurementAliasPair.left, measurementPath);
                 typeProvider.setType(tmpExpression.getExpressionString(), dataType);
                 selectExpressions.add(tmpExpression);
+                deviceToMeasurementsMap
+                    .computeIfAbsent(measurementPath.getDevice(), key -> new LinkedHashSet<>())
+                    .add(replacedMeasurement.getExpressionString());
               }
               paginationController.consumeLimit();
             } else {
@@ -531,7 +567,7 @@ public class Analyzer {
         sourceExpressions
             .computeIfAbsent(
                 ExpressionAnalyzer.getDeviceNameInSourceExpression(sourceExpression),
-                key -> new HashSet<>())
+                key -> new LinkedHashSet<>())
             .add(sourceExpression);
       }
     }
@@ -668,7 +704,12 @@ public class Analyzer {
         QueryStatement queryStatement, List<Pair<Expression, String>> outputExpressions) {
       boolean isIgnoreTimestamp =
           queryStatement.isAggregationQuery() && !queryStatement.isGroupByTime();
-      List<ColumnHeader> columnHeaders =
+      List<ColumnHeader> columnHeaders = new ArrayList<>();
+      if (queryStatement.isAlignByDevice()) {
+        columnHeaders.add(new ColumnHeader(HeaderConstant.COLUMN_DEVICE, TSDataType.TEXT, null));
+        typeProvider.setType(HeaderConstant.COLUMN_DEVICE, TSDataType.TEXT);
+      }
+      columnHeaders.addAll(
           outputExpressions.stream()
               .map(
                   expressionWithAlias -> {
@@ -676,7 +717,7 @@ public class Analyzer {
                     String alias = expressionWithAlias.right;
                     return new ColumnHeader(columnName, typeProvider.getType(columnName), alias);
                   })
-              .collect(Collectors.toList());
+              .collect(Collectors.toList()));
       return new DatasetHeader(columnHeaders, isIgnoreTimestamp);
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index e6f24a546e..61b6c3a99c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -317,6 +317,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       }
       createAlignedTimeSeriesStatement.setEncodings(encodings);
       createAlignedTimeSeriesStatement.setCompressors(compressors);
+      createAlignedTimeSeriesStatement.setAliasList(null);
 
       executeCreateStatement(createAlignedTimeSeriesStatement);
     } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
index 324ad66949..0578ce582e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
@@ -762,6 +762,22 @@ public class ExpressionAnalyzer {
     }
   }
 
+  /** Check for arithmetic expression, logical expression, UDF. Returns true if it exists. */
+  public static boolean checkIsNeedTransform(Expression expression) {
+    if (expression instanceof BinaryExpression) {
+      return true;
+    } else if (expression instanceof UnaryExpression) {
+      return true;
+    } else if (expression instanceof FunctionExpression) {
+      return !expression.isBuiltInAggregationFunctionExpression();
+    } else if (expression instanceof TimeSeriesOperand) {
+      return false;
+    } else {
+      throw new IllegalArgumentException(
+          "unsupported expression type: " + expression.getExpressionType());
+    }
+  }
+
   /////////////////////////////////////////////////////////////////////////////////////////////////
   // Method can only be used in source expression
   /////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 68b6e687f4..c0046625a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -60,23 +60,29 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
     SchemaNode sg = new SchemaInternalNode("sg");
     root.addChild("sg", sg);
 
-    SchemaEntityNode d1 = new SchemaEntityNode("d1");
-    sg.addChild("d1", d1);
-
     SchemaMeasurementNode s1 =
         new SchemaMeasurementNode("s1", new MeasurementSchema("s1", TSDataType.INT32));
-    d1.addChild("s1", s1);
     SchemaMeasurementNode s2 =
-        new SchemaMeasurementNode("s2", new MeasurementSchema("s1", TSDataType.INT32));
+        new SchemaMeasurementNode("s2", new MeasurementSchema("s2", TSDataType.DOUBLE));
+    SchemaMeasurementNode s3 =
+        new SchemaMeasurementNode("s3", new MeasurementSchema("s3", TSDataType.BOOLEAN));
+    SchemaMeasurementNode s4 =
+        new SchemaMeasurementNode("s4", new MeasurementSchema("s4", TSDataType.TEXT));
     s2.setAlias("status");
+
+    SchemaEntityNode d1 = new SchemaEntityNode("d1");
+    sg.addChild("d1", d1);
+    d1.addChild("s1", s1);
     d1.addChild("s2", s2);
     d1.addAliasChild("status", s2);
+    d1.addChild("s3", s3);
 
     SchemaEntityNode d2 = new SchemaEntityNode("d2");
     sg.addChild("d2", d2);
     d2.addChild("s1", s1);
     d2.addChild("s2", s2);
     d2.addAliasChild("status", s2);
+    d2.addChild("s4", s4);
 
     SchemaEntityNode a = new SchemaEntityNode("a");
     a.setAligned(true);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index ac215e0727..3273638f04 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -402,7 +402,13 @@ public class LocalExecutionPlanner {
           node.getChildren().stream()
               .map(child -> child.accept(this, context))
               .collect(Collectors.toList());
-      return new DeviceViewOperator(operatorContext, node.getDevices(), children, null, null);
+      List<List<Integer>> deviceColumnIndex =
+          node.getDevices().stream()
+              .map(deviceName -> node.getDeviceToMeasurementIndexesMap().get(deviceName))
+              .collect(Collectors.toList());
+      List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider());
+      return new DeviceViewOperator(
+          operatorContext, node.getDevices(), children, deviceColumnIndex, outputColumnTypes);
     }
 
     @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 62a8b7f83e..b0c1c48441 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -21,9 +21,13 @@ package org.apache.iotdb.db.mpp.plan.planner;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesCountNode;
@@ -34,17 +38,40 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchS
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTimeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FilterNullParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.query.expression.multi.FunctionExpression;
+import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
+import org.apache.commons.lang.Validate;
+
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -65,79 +92,385 @@ public class LogicalPlanBuilder {
     return root;
   }
 
-  public LogicalPlanBuilder planRawDataQuerySource(
-      Map<String, Set<Expression>> deviceNameToPathsMap,
-      OrderBy scanOrder,
-      boolean isAlignByDevice) {
-    Map<String, List<PlanNode>> deviceNameToSourceNodesMap = new HashMap<>();
+  public LogicalPlanBuilder withNewRoot(PlanNode newRoot) {
+    this.root = newRoot;
+    return this;
+  }
 
-    for (Map.Entry<String, Set<Expression>> entry : deviceNameToPathsMap.entrySet()) {
-      String deviceName = entry.getKey();
-      Set<String> allSensors =
-          entry.getValue().stream()
+  public LogicalPlanBuilder planRawDataSource(
+      Map<String, Set<Expression>> deviceNameToSourceExpressions,
+      OrderBy scanOrder,
+      Filter timeFilter) {
+    List<PlanNode> sourceNodeList = new ArrayList<>();
+    for (Set<Expression> sourceExpressionList :
+        deviceNameToSourceExpressions.values()) { // for each device
+      List<PartialPath> selectedPaths =
+          sourceExpressionList.stream()
               .map(expression -> ((TimeSeriesOperand) expression).getPath())
-              .map(PartialPath::getMeasurement)
-              .collect(Collectors.toSet());
-      for (Expression expression : entry.getValue()) {
-        PartialPath path = ((TimeSeriesOperand) expression).getPath();
-        deviceNameToSourceNodesMap
-            .computeIfAbsent(deviceName, k -> new ArrayList<>())
-            .add(
-                new SeriesScanNode(
-                    context.getQueryId().genPlanNodeId(),
-                    (MeasurementPath) path,
-                    allSensors,
-                    scanOrder));
+              .collect(Collectors.toList());
+      List<PartialPath> groupedPaths = MetaUtils.groupAlignedPaths(selectedPaths);
+      for (PartialPath path : groupedPaths) {
+        if (path instanceof MeasurementPath) { // non-aligned series
+          SeriesScanNode seriesScanNode =
+              new SeriesScanNode(
+                  context.getQueryId().genPlanNodeId(), (MeasurementPath) path, scanOrder);
+          seriesScanNode.setTimeFilter(timeFilter);
+          sourceNodeList.add(seriesScanNode);
+        } else if (path instanceof AlignedPath) { // aligned series
+          AlignedSeriesScanNode alignedSeriesScanNode =
+              new AlignedSeriesScanNode(
+                  context.getQueryId().genPlanNodeId(), (AlignedPath) path, scanOrder);
+          alignedSeriesScanNode.setTimeFilter(timeFilter);
+          sourceNodeList.add(alignedSeriesScanNode);
+        } else {
+          throw new IllegalArgumentException("unexpected path type");
+        }
       }
     }
 
-    if (isAlignByDevice) {
-      planDeviceMerge(deviceNameToSourceNodesMap, scanOrder);
-    } else {
-      planTimeJoin(deviceNameToSourceNodesMap, scanOrder);
+    this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
+    return this;
+  }
+
+  public LogicalPlanBuilder planAggregationSource(
+      Map<String, Set<Expression>> deviceNameToSourceExpressions,
+      OrderBy scanOrder,
+      Filter timeFilter,
+      GroupByTimeParameter groupByTimeParameter,
+      Map<String, Set<Expression>> aggregationExpressions,
+      Map<Expression, Set<Expression>> groupByLevelExpressions,
+      TypeProvider typeProvider) {
+    AggregationStep curStep =
+        (groupByLevelExpressions != null
+                || (groupByTimeParameter != null && groupByTimeParameter.hasOverlap()))
+            ? AggregationStep.PARTIAL
+            : AggregationStep.SINGLE;
+
+    List<PlanNode> sourceNodeList = new ArrayList<>();
+    for (Set<Expression> sourceExpressionList :
+        deviceNameToSourceExpressions.values()) { // for each device
+      Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>();
+      Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>();
+      for (Expression sourceExpression : sourceExpressionList) {
+        AggregationType aggregationFunction =
+            AggregationType.valueOf(
+                ((FunctionExpression) sourceExpression).getFunctionName().toUpperCase());
+        AggregationDescriptor aggregationDescriptor =
+            new AggregationDescriptor(
+                aggregationFunction, curStep, sourceExpression.getExpressions());
+        if (curStep.isOutputPartial()) {
+          updateTypeProviderByPartialAggregation(aggregationDescriptor, typeProvider);
+        }
+        PartialPath selectPath =
+            ((TimeSeriesOperand) sourceExpression.getExpressions().get(0)).getPath();
+        if (SchemaUtils.isConsistentWithScanOrder(aggregationFunction, scanOrder)) {
+          ascendingAggregations
+              .computeIfAbsent(selectPath, key -> new ArrayList<>())
+              .add(aggregationDescriptor);
+        } else {
+          descendingAggregations
+              .computeIfAbsent(selectPath, key -> new ArrayList<>())
+              .add(aggregationDescriptor);
+        }
+      }
+
+      Map<PartialPath, List<AggregationDescriptor>> groupedAscendingAggregations =
+          MetaUtils.groupAlignedAggregations(ascendingAggregations);
+      Map<PartialPath, List<AggregationDescriptor>> groupedDescendingAggregations =
+          MetaUtils.groupAlignedAggregations(descendingAggregations);
+      for (Map.Entry<PartialPath, List<AggregationDescriptor>> pathAggregationsEntry :
+          groupedAscendingAggregations.entrySet()) {
+        sourceNodeList.add(
+            createAggregationScanNode(
+                pathAggregationsEntry.getKey(),
+                pathAggregationsEntry.getValue(),
+                scanOrder,
+                groupByTimeParameter,
+                timeFilter));
+      }
+      for (Map.Entry<PartialPath, List<AggregationDescriptor>> pathAggregationsEntry :
+          groupedDescendingAggregations.entrySet()) {
+        sourceNodeList.add(
+            createAggregationScanNode(
+                pathAggregationsEntry.getKey(),
+                pathAggregationsEntry.getValue(),
+                scanOrder,
+                groupByTimeParameter,
+                timeFilter));
+      }
     }
 
+    if (curStep.isOutputPartial()) {
+      if (groupByTimeParameter != null && groupByTimeParameter.hasOverlap()) {
+        curStep =
+            groupByLevelExpressions != null ? AggregationStep.INTERMEDIATE : AggregationStep.FINAL;
+        this.root =
+            createGroupByTimeNode(
+                sourceNodeList, aggregationExpressions, groupByTimeParameter, curStep);
+
+        if (groupByLevelExpressions != null) {
+          curStep = AggregationStep.FINAL;
+          this.root =
+              createGroupByTLevelNode(this.root.getChildren(), groupByLevelExpressions, curStep);
+        }
+      } else {
+        if (groupByLevelExpressions != null) {
+          curStep = AggregationStep.FINAL;
+          this.root = createGroupByTLevelNode(sourceNodeList, groupByLevelExpressions, curStep);
+        }
+      }
+    } else {
+      this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
+    }
     return this;
   }
 
-  public void planTimeJoin(
-      Map<String, List<PlanNode>> deviceNameToSourceNodesMap, OrderBy mergeOrder) {
-    List<PlanNode> sourceNodes =
-        deviceNameToSourceNodesMap.entrySet().stream()
-            .flatMap(entry -> entry.getValue().stream())
-            .collect(Collectors.toList());
-    this.root = convergeWithTimeJoin(sourceNodes, mergeOrder);
+  private void updateTypeProviderByPartialAggregation(
+      AggregationDescriptor aggregationDescriptor, TypeProvider typeProvider) {
+    List<AggregationType> splitAggregations =
+        SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType());
+    PartialPath path =
+        ((TimeSeriesOperand) aggregationDescriptor.getInputExpressions().get(0)).getPath();
+    for (AggregationType aggregationType : splitAggregations) {
+      String functionName = aggregationType.toString().toLowerCase();
+      typeProvider.setType(
+          String.format("%s(%s)", functionName, path.getFullPath()),
+          SchemaUtils.getSeriesTypeByPath(path, functionName));
+    }
   }
 
-  public void planDeviceMerge(
-      Map<String, List<PlanNode>> deviceNameToSourceNodesMap, OrderBy mergeOrder) {
-    List<String> measurements =
-        deviceNameToSourceNodesMap.values().stream()
-            .flatMap(List::stream)
-            .map(node -> ((SeriesScanNode) node).getSeriesPath().getMeasurement())
-            .distinct()
-            .collect(Collectors.toList());
+  private PlanNode convergeWithTimeJoin(List<PlanNode> sourceNodes, OrderBy mergeOrder) {
+    PlanNode tmpNode;
+    if (sourceNodes.size() == 1) {
+      tmpNode = sourceNodes.get(0);
+    } else {
+      tmpNode = new TimeJoinNode(context.getQueryId().genPlanNodeId(), mergeOrder, sourceNodes);
+    }
+    return tmpNode;
+  }
+
+  public LogicalPlanBuilder planDeviceView(
+      Map<String, PlanNode> deviceNameToSourceNodesMap,
+      List<String> outputColumnNames,
+      Map<String, List<Integer>> deviceToMeasurementIndexesMap,
+      OrderBy mergeOrder) {
     DeviceViewNode deviceViewNode =
         new DeviceViewNode(
             context.getQueryId().genPlanNodeId(),
             Arrays.asList(OrderBy.DEVICE_ASC, mergeOrder),
-            measurements);
-    for (Map.Entry<String, List<PlanNode>> entry : deviceNameToSourceNodesMap.entrySet()) {
+            outputColumnNames,
+            deviceToMeasurementIndexesMap);
+    for (Map.Entry<String, PlanNode> entry : deviceNameToSourceNodesMap.entrySet()) {
       String deviceName = entry.getKey();
-      List<PlanNode> planNodes = new ArrayList<>(entry.getValue());
-      deviceViewNode.addChildDeviceNode(deviceName, convergeWithTimeJoin(planNodes, mergeOrder));
+      PlanNode subPlan = entry.getValue();
+      deviceViewNode.addChildDeviceNode(deviceName, subPlan);
     }
+
     this.root = deviceViewNode;
+    return this;
   }
 
-  private PlanNode convergeWithTimeJoin(List<PlanNode> sourceNodes, OrderBy mergeOrder) {
-    PlanNode tmpNode;
-    if (sourceNodes.size() == 1) {
-      tmpNode = sourceNodes.get(0);
+  public LogicalPlanBuilder planGroupByLevel(
+      Map<Expression, Set<Expression>> groupByLevelExpressions, AggregationStep curStep) {
+    if (groupByLevelExpressions == null) {
+      return this;
+    }
+
+    this.root =
+        createGroupByTLevelNode(
+            Collections.singletonList(this.getRoot()), groupByLevelExpressions, curStep);
+    return this;
+  }
+
+  public LogicalPlanBuilder planAggregation(
+      Map<String, Set<Expression>> aggregationExpressions,
+      GroupByTimeParameter groupByTimeParameter,
+      AggregationStep curStep,
+      TypeProvider typeProvider) {
+    if (aggregationExpressions == null) {
+      return this;
+    }
+
+    List<AggregationDescriptor> aggregationDescriptorList =
+        constructAggregationDescriptorList(aggregationExpressions, curStep);
+    if (curStep.isOutputPartial()) {
+      aggregationDescriptorList.forEach(
+          aggregationDescriptor -> {
+            updateTypeProviderByPartialAggregation(aggregationDescriptor, typeProvider);
+          });
+    }
+    this.root =
+        new AggregationNode(
+            context.getQueryId().genPlanNodeId(),
+            Collections.singletonList(this.getRoot()),
+            aggregationDescriptorList,
+            groupByTimeParameter);
+    return this;
+  }
+
+  public LogicalPlanBuilder planGroupByTime(
+      Map<String, Set<Expression>> aggregationExpressions,
+      GroupByTimeParameter groupByTimeParameter,
+      AggregationStep curStep) {
+    if (aggregationExpressions == null) {
+      return this;
+    }
+
+    this.root =
+        createGroupByTimeNode(
+            Collections.singletonList(this.getRoot()),
+            aggregationExpressions,
+            groupByTimeParameter,
+            curStep);
+    return this;
+  }
+
+  private PlanNode createGroupByTimeNode(
+      List<PlanNode> children,
+      Map<String, Set<Expression>> aggregationExpressions,
+      GroupByTimeParameter groupByTimeParameter,
+      AggregationStep curStep) {
+    List<AggregationDescriptor> aggregationDescriptorList =
+        constructAggregationDescriptorList(aggregationExpressions, curStep);
+    return new GroupByTimeNode(
+        context.getQueryId().genPlanNodeId(),
+        children,
+        aggregationDescriptorList,
+        groupByTimeParameter);
+  }
+
+  private PlanNode createGroupByTLevelNode(
+      List<PlanNode> children,
+      Map<Expression, Set<Expression>> groupByLevelExpressions,
+      AggregationStep curStep) {
+    List<String> outputColumnNames = new ArrayList<>();
+    List<AggregationDescriptor> aggregationDescriptorList = new ArrayList<>();
+    for (Expression groupedExpression : groupByLevelExpressions.keySet()) {
+      AggregationType aggregationFunction =
+          AggregationType.valueOf(
+              ((FunctionExpression) groupedExpression).getFunctionName().toUpperCase());
+      outputColumnNames.add(groupedExpression.getExpressionString());
+      aggregationDescriptorList.add(
+          new AggregationDescriptor(
+              aggregationFunction,
+              curStep,
+              new ArrayList<>(groupByLevelExpressions.get(groupedExpression))));
+    }
+    return new GroupByLevelNode(
+        context.getQueryId().genPlanNodeId(),
+        children,
+        aggregationDescriptorList,
+        outputColumnNames);
+  }
+
+  private PlanNode createAggregationScanNode(
+      PartialPath selectPath,
+      List<AggregationDescriptor> aggregationDescriptorList,
+      OrderBy scanOrder,
+      GroupByTimeParameter groupByTimeParameter,
+      Filter timeFilter) {
+    if (selectPath instanceof MeasurementPath) { // non-aligned series
+      SeriesAggregationScanNode seriesAggregationScanNode =
+          new SeriesAggregationScanNode(
+              context.getQueryId().genPlanNodeId(),
+              (MeasurementPath) selectPath,
+              aggregationDescriptorList,
+              scanOrder,
+              groupByTimeParameter);
+      seriesAggregationScanNode.setTimeFilter(timeFilter);
+      return seriesAggregationScanNode;
+    } else if (selectPath instanceof AlignedPath) { // aligned series
+      AlignedSeriesAggregationScanNode alignedSeriesAggregationScanNode =
+          new AlignedSeriesAggregationScanNode(
+              context.getQueryId().genPlanNodeId(),
+              (AlignedPath) selectPath,
+              aggregationDescriptorList,
+              scanOrder,
+              groupByTimeParameter);
+      alignedSeriesAggregationScanNode.setTimeFilter(timeFilter);
+      return alignedSeriesAggregationScanNode;
     } else {
-      tmpNode = new TimeJoinNode(context.getQueryId().genPlanNodeId(), mergeOrder, sourceNodes);
+      throw new IllegalArgumentException("unexpected path type");
     }
-    return tmpNode;
+  }
+
+  private List<AggregationDescriptor> constructAggregationDescriptorList(
+      Map<String, Set<Expression>> aggregationExpressions, AggregationStep curStep) {
+    return aggregationExpressions.values().stream()
+        .flatMap(Set::stream)
+        .map(
+            expression -> {
+              Validate.isTrue(expression instanceof FunctionExpression);
+              AggregationType aggregationFunction =
+                  AggregationType.valueOf(
+                      ((FunctionExpression) expression).getFunctionName().toUpperCase());
+              return new AggregationDescriptor(
+                  aggregationFunction, curStep, expression.getExpressions());
+            })
+        .collect(Collectors.toList());
+  }
+
+  public LogicalPlanBuilder planFilterAndTransform(
+      Expression queryFilter,
+      Set<Expression> selectExpressions,
+      boolean isGroupByTime,
+      ZoneId zoneId) {
+    if (queryFilter == null) {
+      return this;
+    }
+
+    this.root =
+        new FilterNode(
+            context.getQueryId().genPlanNodeId(),
+            this.getRoot(),
+            selectExpressions.toArray(new Expression[0]),
+            queryFilter,
+            isGroupByTime,
+            zoneId);
+    return this;
+  }
+
+  public LogicalPlanBuilder planTransform(
+      Set<Expression> selectExpressions, boolean isGroupByTime, ZoneId zoneId) {
+    boolean needTransform = false;
+    for (Expression expression : selectExpressions) {
+      if (ExpressionAnalyzer.checkIsNeedTransform(expression)) {
+        needTransform = true;
+        break;
+      }
+    }
+    if (!needTransform) {
+      return this;
+    }
+
+    this.root =
+        new TransformNode(
+            context.getQueryId().genPlanNodeId(),
+            this.getRoot(),
+            selectExpressions.toArray(new Expression[0]),
+            isGroupByTime,
+            zoneId);
+    return this;
+  }
+
+  public LogicalPlanBuilder planFilterNull(FilterNullParameter filterNullParameter) {
+    if (filterNullParameter == null) {
+      return this;
+    }
+
+    this.root =
+        new FilterNullNode(
+            context.getQueryId().genPlanNodeId(), this.getRoot(), filterNullParameter);
+    return this;
+  }
+
+  public LogicalPlanBuilder planFill(FillDescriptor fillDescriptor) {
+    if (fillDescriptor == null) {
+      return this;
+    }
+
+    this.root = new FillNode(context.getQueryId().genPlanNodeId(), this.getRoot(), fillDescriptor);
+    return this;
   }
 
   public LogicalPlanBuilder planLimit(int rowLimit) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index 064c59971f..befcdb71f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
@@ -48,9 +49,17 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTimeSeriesStatement;
+import org.apache.iotdb.db.query.expression.Expression;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /** Generate a logical plan for the statement. */
 public class LogicalPlanner {
@@ -96,14 +105,149 @@ public class LogicalPlanner {
 
     @Override
     public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext context) {
-      return new LogicalPlanBuilder(context)
-          .planRawDataQuerySource(
-              analysis.getSourceExpressions(),
-              queryStatement.getResultOrder(),
-              queryStatement.isAlignByDevice())
-          .planOffset(queryStatement.getRowOffset())
-          .planLimit(queryStatement.getRowLimit())
-          .getRoot();
+      LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+
+      if (queryStatement.isAlignByDevice()) {
+        Map<String, PlanNode> deviceToSubPlanMap = new HashMap<>();
+        for (String deviceName : analysis.getSourceExpressions().keySet()) {
+          LogicalPlanBuilder subPlanBuilder = new LogicalPlanBuilder(context);
+          subPlanBuilder =
+              subPlanBuilder.withNewRoot(
+                  visitQueryBody(
+                      queryStatement,
+                      Maps.asMap(
+                          Sets.newHashSet(deviceName),
+                          (key) -> analysis.getSourceExpressions().get(key)),
+                      Maps.asMap(
+                          Sets.newHashSet(deviceName),
+                          (key) -> analysis.getAggregationExpressions().get(key)),
+                      analysis.getSourceExpressions().get(deviceName),
+                      analysis.getDeviceToQueryFilter() != null
+                          ? analysis.getDeviceToQueryFilter().get(deviceName)
+                          : null,
+                      context));
+          deviceToSubPlanMap.put(deviceName, subPlanBuilder.getRoot());
+        }
+        // convert to ALIGN BY DEVICE view
+        planBuilder =
+            planBuilder.planDeviceView(
+                deviceToSubPlanMap,
+                analysis.getRespDatasetHeader().getRespColumns().stream()
+                    .distinct()
+                    .collect(Collectors.toList()),
+                analysis.getDeviceToMeasurementIndexesMap(),
+                queryStatement.getResultOrder());
+      } else {
+        planBuilder =
+            planBuilder.withNewRoot(
+                visitQueryBody(
+                    queryStatement,
+                    analysis.getSourceExpressions(),
+                    analysis.getAggregationExpressions(),
+                    analysis.getSelectExpressions(),
+                    analysis.getQueryFilter(),
+                    context));
+      }
+
+      // other common upstream node
+      planBuilder =
+          planBuilder
+              .planFilterNull(analysis.getFilterNullParameter())
+              .planFill(analysis.getFillDescriptor())
+              .planOffset(queryStatement.getRowOffset())
+              .planLimit(queryStatement.getRowLimit());
+
+      return planBuilder.getRoot();
+    }
+
+    public PlanNode visitQueryBody(
+        QueryStatement queryStatement,
+        Map<String, Set<Expression>> sourceExpressions,
+        Map<String, Set<Expression>> aggregationExpressions,
+        Set<Expression> selectExpressions,
+        Expression queryFilter,
+        MPPQueryContext context) {
+      LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+      boolean isRawDataSource =
+          !queryStatement.isAggregationQuery()
+              || (queryStatement.isAggregationQuery() && analysis.hasValueFilter());
+
+      // plan data source node
+      if (isRawDataSource) {
+        planBuilder =
+            planBuilder.planRawDataSource(
+                sourceExpressions, queryStatement.getResultOrder(), analysis.getGlobalTimeFilter());
+
+        if (queryStatement.isAggregationQuery()) {
+          if (analysis.hasValueFilter()) {
+            planBuilder =
+                planBuilder.planFilterAndTransform(
+                    queryFilter,
+                    sourceExpressions.values().stream()
+                        .flatMap(Set::stream)
+                        .collect(Collectors.toSet()),
+                    queryStatement.isGroupByTime(),
+                    queryStatement.getSelectComponent().getZoneId());
+          }
+
+          boolean outputPartial =
+              queryStatement.isGroupByLevel()
+                  || (queryStatement.isGroupByTime()
+                      && analysis.getGroupByTimeParameter().hasOverlap());
+          AggregationStep curStep = outputPartial ? AggregationStep.PARTIAL : AggregationStep.FINAL;
+          planBuilder =
+              planBuilder.planAggregation(
+                  aggregationExpressions,
+                  analysis.getGroupByTimeParameter(),
+                  curStep,
+                  analysis.getTypeProvider());
+
+          if (curStep.isOutputPartial()) {
+            if (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()) {
+              curStep =
+                  queryStatement.isGroupByLevel()
+                      ? AggregationStep.INTERMEDIATE
+                      : AggregationStep.FINAL;
+              planBuilder =
+                  planBuilder.planGroupByTime(
+                      aggregationExpressions, analysis.getGroupByTimeParameter(), curStep);
+            }
+
+            if (queryStatement.isGroupByLevel()) {
+              curStep = AggregationStep.FINAL;
+              planBuilder =
+                  planBuilder.planGroupByLevel(analysis.getGroupByLevelExpressions(), curStep);
+            }
+          }
+        } else {
+          if (analysis.hasValueFilter()) {
+            planBuilder =
+                planBuilder.planFilterAndTransform(
+                    queryFilter,
+                    selectExpressions,
+                    queryStatement.isGroupByTime(),
+                    queryStatement.getSelectComponent().getZoneId());
+          } else {
+            planBuilder =
+                planBuilder.planTransform(
+                    selectExpressions,
+                    queryStatement.isGroupByTime(),
+                    queryStatement.getSelectComponent().getZoneId());
+          }
+        }
+      } else {
+        planBuilder =
+            planBuilder.planAggregationSource(
+                sourceExpressions,
+                queryStatement.getResultOrder(),
+                analysis.getGlobalTimeFilter(),
+                analysis.getGroupByTimeParameter(),
+                aggregationExpressions,
+                analysis.getGroupByLevelExpressions(),
+                analysis.getTypeProvider());
+      }
+
+      return planBuilder.getRoot();
     }
 
     @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
index d9c049a389..589e666297 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
@@ -27,7 +27,9 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -50,21 +52,35 @@ public class DeviceViewNode extends ProcessNode {
   // each child node whose output TsBlock contains the data belonged to one device.
   private final List<PlanNode> children = new ArrayList<>();
 
-  // measurement columns in result output
-  private final List<String> measurements;
+  // Device column and measurement columns in result output
+  private final List<String> outputColumnNames;
 
-  public DeviceViewNode(PlanNodeId id, List<OrderBy> mergeOrders, List<String> measurements) {
+  // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> [1, 3], s1 is 1 but
+  // not 0 because device is the first column
+  private Map<String, List<Integer>> deviceToMeasurementIndexesMap;
+
+  public DeviceViewNode(
+      PlanNodeId id,
+      List<OrderBy> mergeOrders,
+      List<String> outputColumnNames,
+      Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
     super(id);
     this.mergeOrders = mergeOrders;
-    this.measurements = measurements;
+    this.outputColumnNames = outputColumnNames;
+    this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
   }
 
   public DeviceViewNode(
-      PlanNodeId id, List<OrderBy> mergeOrders, List<String> measurements, List<String> devices) {
+      PlanNodeId id,
+      List<OrderBy> mergeOrders,
+      List<String> outputColumnNames,
+      List<String> devices,
+      Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
     super(id);
     this.mergeOrders = mergeOrders;
-    this.measurements = measurements;
+    this.outputColumnNames = outputColumnNames;
     this.devices.addAll(devices);
+    this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
   }
 
   public void addChildDeviceNode(String deviceName, PlanNode childNode) {
@@ -76,6 +92,10 @@ public class DeviceViewNode extends ProcessNode {
     return devices;
   }
 
+  public Map<String, List<Integer>> getDeviceToMeasurementIndexesMap() {
+    return deviceToMeasurementIndexesMap;
+  }
+
   @Override
   public List<PlanNode> getChildren() {
     return children;
@@ -93,12 +113,13 @@ public class DeviceViewNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new DeviceViewNode(getPlanNodeId(), mergeOrders, measurements, devices);
+    return new DeviceViewNode(
+        getPlanNodeId(), mergeOrders, outputColumnNames, devices, deviceToMeasurementIndexesMap);
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return measurements;
+    return outputColumnNames;
   }
 
   @Override
@@ -111,25 +132,33 @@ public class DeviceViewNode extends ProcessNode {
     PlanNodeType.DEVICE_VIEW.serialize(byteBuffer);
     ReadWriteIOUtils.write(mergeOrders.get(0).ordinal(), byteBuffer);
     ReadWriteIOUtils.write(mergeOrders.get(1).ordinal(), byteBuffer);
-    ReadWriteIOUtils.write(measurements.size(), byteBuffer);
-    for (String measurement : measurements) {
-      ReadWriteIOUtils.write(measurement, byteBuffer);
+    ReadWriteIOUtils.write(outputColumnNames.size(), byteBuffer);
+    for (String column : outputColumnNames) {
+      ReadWriteIOUtils.write(column, byteBuffer);
     }
     ReadWriteIOUtils.write(devices.size(), byteBuffer);
     for (String deviceName : devices) {
       ReadWriteIOUtils.write(deviceName, byteBuffer);
     }
+    ReadWriteIOUtils.write(deviceToMeasurementIndexesMap.size(), byteBuffer);
+    for (Map.Entry<String, List<Integer>> entry : deviceToMeasurementIndexesMap.entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+      ReadWriteIOUtils.write(entry.getValue().size(), byteBuffer);
+      for (Integer index : entry.getValue()) {
+        ReadWriteIOUtils.write(index, byteBuffer);
+      }
+    }
   }
 
   public static DeviceViewNode deserialize(ByteBuffer byteBuffer) {
     List<OrderBy> mergeOrders = new ArrayList<>();
     mergeOrders.add(OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)]);
     mergeOrders.add(OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)]);
-    int measurementsSize = ReadWriteIOUtils.readInt(byteBuffer);
-    List<String> measurements = new ArrayList<>();
-    while (measurementsSize > 0) {
-      measurements.add(ReadWriteIOUtils.readString(byteBuffer));
-      measurementsSize--;
+    int columnSize = ReadWriteIOUtils.readInt(byteBuffer);
+    List<String> outputColumnNames = new ArrayList<>();
+    while (columnSize > 0) {
+      outputColumnNames.add(ReadWriteIOUtils.readString(byteBuffer));
+      columnSize--;
     }
     int devicesSize = ReadWriteIOUtils.readInt(byteBuffer);
     List<String> devices = new ArrayList<>();
@@ -137,8 +166,22 @@ public class DeviceViewNode extends ProcessNode {
       devices.add(ReadWriteIOUtils.readString(byteBuffer));
       devicesSize--;
     }
+    int mapSize = ReadWriteIOUtils.readInt(byteBuffer);
+    Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>(mapSize);
+    while (mapSize > 0) {
+      String deviceName = ReadWriteIOUtils.readString(byteBuffer);
+      int listSize = ReadWriteIOUtils.readInt(byteBuffer);
+      List<Integer> indexes = new ArrayList<>(listSize);
+      while (listSize > 0) {
+        indexes.add(ReadWriteIOUtils.readInt(byteBuffer));
+        listSize--;
+      }
+      deviceToMeasurementIndexesMap.put(deviceName, indexes);
+      mapSize--;
+    }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new DeviceViewNode(planNodeId, mergeOrders, measurements, devices);
+    return new DeviceViewNode(
+        planNodeId, mergeOrders, outputColumnNames, devices, deviceToMeasurementIndexesMap);
   }
 
   @Override
@@ -156,11 +199,18 @@ public class DeviceViewNode extends ProcessNode {
     return mergeOrders.equals(that.mergeOrders)
         && devices.equals(that.devices)
         && children.equals(that.children)
-        && measurements.equals(that.measurements);
+        && outputColumnNames.equals(that.outputColumnNames)
+        && deviceToMeasurementIndexesMap.equals(that.deviceToMeasurementIndexesMap);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), mergeOrders, devices, children, measurements);
+    return Objects.hash(
+        super.hashCode(),
+        mergeOrders,
+        devices,
+        children,
+        outputColumnNames,
+        deviceToMeasurementIndexesMap);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index d4a6ae36eb..e072d6d3f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -162,11 +162,11 @@ public class GroupByLevelNode extends AggregationNode {
       return false;
     }
     GroupByLevelNode that = (GroupByLevelNode) o;
-    return outputColumnNames.equals(that.outputColumnNames) && children.equals(that.children);
+    return outputColumnNames.equals(that.outputColumnNames);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), outputColumnNames, children);
+    return Objects.hash(super.hashCode(), outputColumnNames);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
index 88bdc54be6..c57e6692b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
@@ -81,9 +81,11 @@ public class AlignedSeriesAggregationScanNode extends SourceNode {
       PlanNodeId id,
       AlignedPath alignedPath,
       List<AggregationDescriptor> aggregationDescriptorList,
-      OrderBy scanOrder) {
+      OrderBy scanOrder,
+      @Nullable GroupByTimeParameter groupByTimeParameter) {
     this(id, alignedPath, aggregationDescriptorList);
     this.scanOrder = scanOrder;
+    this.groupByTimeParameter = groupByTimeParameter;
   }
 
   public AlignedSeriesAggregationScanNode(
@@ -94,9 +96,8 @@ public class AlignedSeriesAggregationScanNode extends SourceNode {
       @Nullable Filter timeFilter,
       @Nullable GroupByTimeParameter groupByTimeParameter,
       TRegionReplicaSet dataRegionReplicaSet) {
-    this(id, alignedPath, aggregationDescriptorList, scanOrder);
+    this(id, alignedPath, aggregationDescriptorList, scanOrder, groupByTimeParameter);
     this.timeFilter = timeFilter;
-    this.groupByTimeParameter = groupByTimeParameter;
     this.regionReplicaSet = dataRegionReplicaSet;
   }
 
@@ -117,6 +118,10 @@ public class AlignedSeriesAggregationScanNode extends SourceNode {
     return timeFilter;
   }
 
+  public void setTimeFilter(@Nullable Filter timeFilter) {
+    this.timeFilter = timeFilter;
+  }
+
   @Nullable
   public GroupByTimeParameter getGroupByTimeParameter() {
     return groupByTimeParameter;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
index fd1df944d3..749c653e0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
@@ -107,6 +107,10 @@ public class AlignedSeriesScanNode extends SourceNode {
     return timeFilter;
   }
 
+  public void setTimeFilter(@Nullable Filter timeFilter) {
+    this.timeFilter = timeFilter;
+  }
+
   @Nullable
   public Filter getValueFilter() {
     return valueFilter;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
index f3963c5036..abf3e5b8d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
-import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -40,10 +39,8 @@ import javax.annotation.Nullable;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -70,9 +67,6 @@ public class SeriesAggregationScanNode extends SourceNode {
   // result TsBlock
   private final List<AggregationDescriptor> aggregationDescriptorList;
 
-  // all the sensors in seriesPath's device of current query
-  private final Set<String> allSensors;
-
   // The order to traverse the data.
   // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
   // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
@@ -91,11 +85,9 @@ public class SeriesAggregationScanNode extends SourceNode {
   public SeriesAggregationScanNode(
       PlanNodeId id,
       MeasurementPath seriesPath,
-      List<AggregationDescriptor> aggregationDescriptorList,
-      Set<String> allSensors) {
+      List<AggregationDescriptor> aggregationDescriptorList) {
     super(id);
     this.seriesPath = seriesPath;
-    this.allSensors = allSensors;
     this.aggregationDescriptorList = aggregationDescriptorList;
   }
 
@@ -103,24 +95,23 @@ public class SeriesAggregationScanNode extends SourceNode {
       PlanNodeId id,
       MeasurementPath seriesPath,
       List<AggregationDescriptor> aggregationDescriptorList,
-      Set<String> allSensors,
-      OrderBy scanOrder) {
-    this(id, seriesPath, aggregationDescriptorList, allSensors);
+      OrderBy scanOrder,
+      @Nullable GroupByTimeParameter groupByTimeParameter) {
+    this(id, seriesPath, aggregationDescriptorList);
     this.scanOrder = scanOrder;
+    this.groupByTimeParameter = groupByTimeParameter;
   }
 
   public SeriesAggregationScanNode(
       PlanNodeId id,
       MeasurementPath seriesPath,
       List<AggregationDescriptor> aggregationDescriptorList,
-      Set<String> allSensors,
       OrderBy scanOrder,
       @Nullable Filter timeFilter,
       @Nullable GroupByTimeParameter groupByTimeParameter,
       TRegionReplicaSet dataRegionReplicaSet) {
-    this(id, seriesPath, aggregationDescriptorList, allSensors, scanOrder);
+    this(id, seriesPath, aggregationDescriptorList, scanOrder, groupByTimeParameter);
     this.timeFilter = timeFilter;
-    this.groupByTimeParameter = groupByTimeParameter;
     this.regionReplicaSet = dataRegionReplicaSet;
   }
 
@@ -128,15 +119,15 @@ public class SeriesAggregationScanNode extends SourceNode {
     return scanOrder;
   }
 
-  public Set<String> getAllSensors() {
-    return allSensors;
-  }
-
   @Nullable
   public Filter getTimeFilter() {
     return timeFilter;
   }
 
+  public void setTimeFilter(@Nullable Filter timeFilter) {
+    this.timeFilter = timeFilter;
+  }
+
   @Nullable
   public GroupByTimeParameter getGroupByTimeParameter() {
     return groupByTimeParameter;
@@ -150,16 +141,6 @@ public class SeriesAggregationScanNode extends SourceNode {
     return aggregationDescriptorList;
   }
 
-  public List<AggregationType> getAggregateFuncList() {
-    return aggregationDescriptorList.stream()
-        .map(AggregationDescriptor::getAggregationType)
-        .collect(Collectors.toList());
-  }
-
-  public void setTimeFilter(Filter timeFilter) {
-    this.timeFilter = timeFilter;
-  }
-
   @Override
   public List<PlanNode> getChildren() {
     return ImmutableList.of();
@@ -181,7 +162,6 @@ public class SeriesAggregationScanNode extends SourceNode {
         getPlanNodeId(),
         getSeriesPath(),
         getAggregationDescriptorList(),
-        getAllSensors(),
         getScanOrder(),
         getTimeFilter(),
         getGroupByTimeParameter(),
@@ -225,10 +205,6 @@ public class SeriesAggregationScanNode extends SourceNode {
     for (AggregationDescriptor aggregationDescriptor : aggregationDescriptorList) {
       aggregationDescriptor.serialize(byteBuffer);
     }
-    ReadWriteIOUtils.write(allSensors.size(), byteBuffer);
-    for (String sensor : allSensors) {
-      ReadWriteIOUtils.write(sensor, byteBuffer);
-    }
     ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
     if (timeFilter == null) {
       ReadWriteIOUtils.write((byte) 0, byteBuffer);
@@ -252,11 +228,6 @@ public class SeriesAggregationScanNode extends SourceNode {
     for (int i = 0; i < aggregateDescriptorSize; i++) {
       aggregationDescriptorList.add(AggregationDescriptor.deserialize(byteBuffer));
     }
-    int allSensorsSize = ReadWriteIOUtils.readInt(byteBuffer);
-    Set<String> allSensors = new HashSet<>();
-    for (int i = 0; i < allSensorsSize; i++) {
-      allSensors.add(ReadWriteIOUtils.readString(byteBuffer));
-    }
     OrderBy scanOrder = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
     byte isNull = ReadWriteIOUtils.readByte(byteBuffer);
     Filter timeFilter = null;
@@ -275,7 +246,6 @@ public class SeriesAggregationScanNode extends SourceNode {
         planNodeId,
         partialPath,
         aggregationDescriptorList,
-        allSensors,
         scanOrder,
         timeFilter,
         groupByTimeParameter,
@@ -296,7 +266,6 @@ public class SeriesAggregationScanNode extends SourceNode {
     SeriesAggregationScanNode that = (SeriesAggregationScanNode) o;
     return seriesPath.equals(that.seriesPath)
         && aggregationDescriptorList.equals(that.aggregationDescriptorList)
-        && allSensors.equals(that.allSensors)
         && scanOrder == that.scanOrder
         && Objects.equals(timeFilter, that.timeFilter)
         && Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
@@ -309,7 +278,6 @@ public class SeriesAggregationScanNode extends SourceNode {
         super.hashCode(),
         seriesPath,
         aggregationDescriptorList,
-        allSensors,
         scanOrder,
         timeFilter,
         groupByTimeParameter,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java
index 539c5109fe..3b3fe8f05c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java
@@ -33,14 +33,11 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import com.google.common.collect.ImmutableList;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.nio.ByteBuffer;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
 
 /**
  * SeriesScanNode is responsible for read data a specific series. When reading data, the
@@ -54,9 +51,6 @@ public class SeriesScanNode extends SourceNode {
   // The path of the target series which will be scanned.
   private final MeasurementPath seriesPath;
 
-  // all the sensors in seriesPath's device of current query
-  @Nonnull private final Set<String> allSensors;
-
   // The order to traverse the data.
   // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
   // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
@@ -77,30 +71,26 @@ public class SeriesScanNode extends SourceNode {
   // The id of DataRegion where the node will run
   private TRegionReplicaSet regionReplicaSet;
 
-  public SeriesScanNode(
-      PlanNodeId id, MeasurementPath seriesPath, @Nonnull Set<String> allSensors) {
+  public SeriesScanNode(PlanNodeId id, MeasurementPath seriesPath) {
     super(id);
     this.seriesPath = seriesPath;
-    this.allSensors = allSensors;
   }
 
-  public SeriesScanNode(
-      PlanNodeId id, MeasurementPath seriesPath, Set<String> allSensors, OrderBy scanOrder) {
-    this(id, seriesPath, allSensors);
+  public SeriesScanNode(PlanNodeId id, MeasurementPath seriesPath, OrderBy scanOrder) {
+    this(id, seriesPath);
     this.scanOrder = scanOrder;
   }
 
   public SeriesScanNode(
       PlanNodeId id,
       MeasurementPath seriesPath,
-      Set<String> allSensors,
       OrderBy scanOrder,
       @Nullable Filter timeFilter,
       @Nullable Filter valueFilter,
       int limit,
       int offset,
       TRegionReplicaSet dataRegionReplicaSet) {
-    this(id, seriesPath, allSensors, scanOrder);
+    this(id, seriesPath, scanOrder);
     this.timeFilter = timeFilter;
     this.valueFilter = valueFilter;
     this.limit = limit;
@@ -140,11 +130,6 @@ public class SeriesScanNode extends SourceNode {
     this.offset = offset;
   }
 
-  @Nonnull
-  public Set<String> getAllSensors() {
-    return allSensors;
-  }
-
   public OrderBy getScanOrder() {
     return scanOrder;
   }
@@ -162,6 +147,10 @@ public class SeriesScanNode extends SourceNode {
     return timeFilter;
   }
 
+  public void setTimeFilter(@Nullable Filter timeFilter) {
+    this.timeFilter = timeFilter;
+  }
+
   @Nullable
   public Filter getValueFilter() {
     return valueFilter;
@@ -187,7 +176,6 @@ public class SeriesScanNode extends SourceNode {
     return new SeriesScanNode(
         getPlanNodeId(),
         getSeriesPath(),
-        getAllSensors(),
         getScanOrder(),
         getTimeFilter(),
         getValueFilter(),
@@ -210,10 +198,6 @@ public class SeriesScanNode extends SourceNode {
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.SERIES_SCAN.serialize(byteBuffer);
     seriesPath.serialize(byteBuffer);
-    ReadWriteIOUtils.write(allSensors.size(), byteBuffer);
-    for (String sensor : allSensors) {
-      ReadWriteIOUtils.write(sensor, byteBuffer);
-    }
     ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
     if (timeFilter == null) {
       ReadWriteIOUtils.write((byte) 0, byteBuffer);
@@ -234,11 +218,6 @@ public class SeriesScanNode extends SourceNode {
 
   public static SeriesScanNode deserialize(ByteBuffer byteBuffer) {
     MeasurementPath partialPath = (MeasurementPath) PathDeserializeUtil.deserialize(byteBuffer);
-    int allSensorSize = ReadWriteIOUtils.readInt(byteBuffer);
-    Set<String> allSensors = new HashSet<>();
-    for (int i = 0; i < allSensorSize; i++) {
-      allSensors.add(ReadWriteIOUtils.readString(byteBuffer));
-    }
     OrderBy scanOrder = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
     byte isNull = ReadWriteIOUtils.readByte(byteBuffer);
     Filter timeFilter = null;
@@ -258,7 +237,6 @@ public class SeriesScanNode extends SourceNode {
     return new SeriesScanNode(
         planNodeId,
         partialPath,
-        allSensors,
         scanOrder,
         timeFilter,
         valueFilter,
@@ -289,7 +267,6 @@ public class SeriesScanNode extends SourceNode {
     return limit == that.limit
         && offset == that.offset
         && seriesPath.equals(that.seriesPath)
-        && allSensors.equals(that.allSensors)
         && scanOrder == that.scanOrder
         && Objects.equals(timeFilter, that.timeFilter)
         && Objects.equals(valueFilter, that.valueFilter)
@@ -301,7 +278,6 @@ public class SeriesScanNode extends SourceNode {
     return Objects.hash(
         super.hashCode(),
         seriesPath,
-        allSensors,
         scanOrder,
         timeFilter,
         valueFilter,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
index e7f0b42e36..1f1923a6b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -34,7 +34,7 @@ public class AggregationDescriptor {
   private final AggregationType aggregationType;
 
   // indicate the input and output type
-  private final AggregationStep step;
+  private AggregationStep step;
 
   /**
    * Input of aggregation function. Currently, we only support one series in the aggregation
@@ -63,6 +63,14 @@ public class AggregationDescriptor {
     return step;
   }
 
+  public void setStep(AggregationStep step) {
+    this.step = step;
+  }
+
+  public List<Expression> getInputExpressions() {
+    return inputExpressions;
+  }
+
   public void serialize(ByteBuffer byteBuffer) {
     ReadWriteIOUtils.write(aggregationType.ordinal(), byteBuffer);
     step.serialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByTimeParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByTimeParameter.java
index 468b36e6a2..1210bc55ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByTimeParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByTimeParameter.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
+import static org.apache.iotdb.db.qp.utils.DatetimeUtils.MS_TO_MONTH;
+
 /** The parameter of `GROUP BY TIME` */
 public class GroupByTimeParameter {
 
@@ -135,6 +137,12 @@ public class GroupByTimeParameter {
     this.leftCRightO = leftCRightO;
   }
 
+  public boolean hasOverlap() {
+    long tmpInterval = isIntervalByMonth ? interval * MS_TO_MONTH : interval;
+    long tmpSlidingStep = isSlidingStepByMonth ? slidingStep * MS_TO_MONTH : slidingStep;
+    return tmpInterval > tmpSlidingStep;
+  }
+
   public void serialize(ByteBuffer buffer) {
     ReadWriteIOUtils.write(startTime, buffer);
     ReadWriteIOUtils.write(endTime, buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index 72f275653a..ae6923c08a 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -27,7 +27,9 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -42,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumMap;
@@ -205,6 +208,33 @@ public class SchemaUtils {
     }
   }
 
+  /**
+   * judge whether the order of aggregation calculation is consistent with the order of traversing
+   * data
+   */
+  public static boolean isConsistentWithScanOrder(
+      AggregationType aggregationFunction, OrderBy scanOrder) {
+    boolean ascending = scanOrder == OrderBy.TIMESTAMP_ASC;
+    switch (aggregationFunction) {
+      case MIN_TIME:
+      case FIRST_VALUE:
+        return ascending;
+      case MAX_TIME:
+      case LAST_VALUE:
+        return !ascending;
+      case SUM:
+      case MIN_VALUE:
+      case MAX_VALUE:
+      case EXTREME:
+      case COUNT:
+      case AVG:
+        return true;
+      default:
+        throw new IllegalArgumentException(
+            String.format("Invalid Aggregation function: %s", aggregationFunction));
+    }
+  }
+
   /**
    * If e or one of its recursive causes is a PathNotExistException or StorageGroupNotSetException,
    * return such an exception or null if it cannot be found.
@@ -233,4 +263,26 @@ public class SchemaUtils {
           String.format("encoding %s does not support %s", encoding, dataType), true);
     }
   }
+
+  public static List<AggregationType> splitPartialAggregation(AggregationType aggregationType) {
+    switch (aggregationType) {
+      case FIRST_VALUE:
+        return Collections.singletonList(AggregationType.MIN_TIME);
+      case LAST_VALUE:
+        return Collections.singletonList(AggregationType.MAX_TIME);
+      case AVG:
+        return Arrays.asList(AggregationType.COUNT, AggregationType.SUM);
+      case SUM:
+      case MIN_VALUE:
+      case MAX_VALUE:
+      case EXTREME:
+      case COUNT:
+      case MIN_TIME:
+      case MAX_TIME:
+        return Collections.emptyList();
+      default:
+        throw new IllegalArgumentException(
+            String.format("Invalid Aggregation function: %s", aggregationType));
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
index 83ec2ff10b..e553a72666 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
@@ -55,7 +55,6 @@ import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
-import com.google.common.collect.Sets;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -75,7 +74,6 @@ public class DistributionPlannerTest {
         new SeriesScanNode(
             queryId.genPlanNodeId(),
             new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            Sets.newHashSet("s1", "s2"),
             OrderBy.TIMESTAMP_ASC);
 
     Analysis analysis = constructAnalysis();
@@ -96,7 +94,6 @@ public class DistributionPlannerTest {
         new SeriesScanNode(
             queryId.genPlanNodeId(),
             new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            Sets.newHashSet("s1", "s2"),
             OrderBy.TIMESTAMP_ASC);
 
     Analysis analysis = constructAnalysis();
@@ -119,19 +116,16 @@ public class DistributionPlannerTest {
         new SeriesScanNode(
             queryId.genPlanNodeId(),
             new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            Sets.newHashSet("s1", "s2"),
             OrderBy.TIMESTAMP_ASC));
     timeJoinNode.addChild(
         new SeriesScanNode(
             queryId.genPlanNodeId(),
             new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
-            Sets.newHashSet("s1", "s2"),
             OrderBy.TIMESTAMP_ASC));
     timeJoinNode.addChild(
         new SeriesScanNode(
             queryId.genPlanNodeId(),
             new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
-            Sets.newHashSet("s1"),
             OrderBy.TIMESTAMP_ASC));
 
     LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
@@ -200,19 +194,16 @@ public class DistributionPlannerTest {
         new SeriesScanNode(
             queryId.genPlanNodeId(),
             new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            Sets.newHashSet("s1", "s2"),
             OrderBy.TIMESTAMP_ASC));
     timeJoinNode.addChild(
         new SeriesScanNode(
             queryId.genPlanNodeId(),
             new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
-            Sets.newHashSet("s1", "s2"),
             OrderBy.TIMESTAMP_ASC));
     timeJoinNode.addChild(
         new SeriesScanNode(
             queryId.genPlanNodeId(),
             new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
-            Sets.newHashSet("s1"),
             OrderBy.TIMESTAMP_ASC));
 
     LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
@@ -240,19 +231,16 @@ public class DistributionPlannerTest {
         new SeriesScanNode(
             queryId.genPlanNodeId(),
             new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            Sets.newHashSet("s1", "s2"),
             OrderBy.TIMESTAMP_ASC));
     timeJoinNode.addChild(
         new SeriesScanNode(
             queryId.genPlanNodeId(),
             new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
-            Sets.newHashSet("s1", "s2"),
             OrderBy.TIMESTAMP_ASC));
     timeJoinNode.addChild(
         new SeriesScanNode(
             queryId.genPlanNodeId(),
             new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
-            Sets.newHashSet("s1"),
             OrderBy.TIMESTAMP_ASC));
 
     LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
@@ -278,19 +266,16 @@ public class DistributionPlannerTest {
         new SeriesScanNode(
             queryId.genPlanNodeId(),
             new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            Sets.newHashSet("s1", "s2"),
             OrderBy.TIMESTAMP_ASC));
     timeJoinNode.addChild(
         new SeriesScanNode(
             queryId.genPlanNodeId(),
             new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
-            Sets.newHashSet("s1", "s2"),
             OrderBy.TIMESTAMP_ASC));
     timeJoinNode.addChild(
         new SeriesScanNode(
             queryId.genPlanNodeId(),
             new MeasurementPath("root.sg.d333.s1", TSDataType.INT32),
-            Sets.newHashSet("s1"),
             OrderBy.TIMESTAMP_ASC));
 
     LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
index 0f16730f2e..53950b092a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -124,28 +123,19 @@ public class FragmentInstanceSerdeTest {
     TimeJoinNode timeJoinNode =
         new TimeJoinNode(new PlanNodeId("TimeJoinNode"), OrderBy.TIMESTAMP_DESC);
     SeriesScanNode seriesScanNode1 =
-        new SeriesScanNode(
-            new PlanNodeId("SeriesScanNode1"),
-            new MeasurementPath("root.sg.d1.s2"),
-            Sets.newHashSet("s2"));
+        new SeriesScanNode(new PlanNodeId("SeriesScanNode1"), new MeasurementPath("root.sg.d1.s2"));
     seriesScanNode1.setRegionReplicaSet(
         new TRegionReplicaSet(
             new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), new ArrayList<>()));
     seriesScanNode1.setScanOrder(OrderBy.TIMESTAMP_DESC);
     SeriesScanNode seriesScanNode2 =
-        new SeriesScanNode(
-            new PlanNodeId("SeriesScanNode2"),
-            new MeasurementPath("root.sg.d2.s1"),
-            Sets.newHashSet("s1", "s2"));
+        new SeriesScanNode(new PlanNodeId("SeriesScanNode2"), new MeasurementPath("root.sg.d2.s1"));
     seriesScanNode2.setRegionReplicaSet(
         new TRegionReplicaSet(
             new TConsensusGroupId(TConsensusGroupType.DataRegion, 2), new ArrayList<>()));
     seriesScanNode2.setScanOrder(OrderBy.TIMESTAMP_DESC);
     SeriesScanNode seriesScanNode3 =
-        new SeriesScanNode(
-            new PlanNodeId("SeriesScanNode3"),
-            new MeasurementPath("root.sg.d2.s2"),
-            Sets.newHashSet("s1", "s2"));
+        new SeriesScanNode(new PlanNodeId("SeriesScanNode3"), new MeasurementPath("root.sg.d2.s2"));
     seriesScanNode3.setRegionReplicaSet(
         new TRegionReplicaSet(
             new TConsensusGroupId(TConsensusGroupType.DataRegion, 3), new ArrayList<>()));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index c500cd278e..6248ed7791 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -20,7 +20,10 @@
 package org.apache.iotdb.db.mpp.plan.plan;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
@@ -30,14 +33,16 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.statement.component.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.binary.GreaterThanExpression;
@@ -45,17 +50,17 @@ import org.apache.iotdb.db.query.expression.binary.LogicAndExpression;
 import org.apache.iotdb.db.query.expression.leaf.ConstantOperand;
 import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.query.expression.leaf.TimestampOperand;
+import org.apache.iotdb.db.query.expression.multi.FunctionExpression;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-import org.apache.commons.compress.utils.Sets;
-
-import java.time.ZoneId;
+import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -68,46 +73,69 @@ public class QueryLogicalPlanUtil {
   // key: query statement; value: expected logical plan
   public static final Map<String, PlanNode> sqlToPlanMap = new HashMap<>();
 
-  public static final Map<String, MeasurementPath> schemaMap = new HashMap<>();
+  public static final Map<String, PartialPath> schemaMap = new HashMap<>();
 
   static {
     try {
       schemaMap.put("root.sg.d1.s1", new MeasurementPath("root.sg.d1.s1", TSDataType.INT32));
-      schemaMap.put("root.sg.d1.s2", new MeasurementPath("root.sg.d1.s2", TSDataType.INT32));
+      schemaMap.put("root.sg.d1.s2", new MeasurementPath("root.sg.d1.s2", TSDataType.DOUBLE));
+      schemaMap.put("root.sg.d1.s3", new MeasurementPath("root.sg.d1.s3", TSDataType.BOOLEAN));
       schemaMap.put("root.sg.d2.s1", new MeasurementPath("root.sg.d2.s1", TSDataType.INT32));
-      schemaMap.put("root.sg.d2.s2", new MeasurementPath("root.sg.d2.s2", TSDataType.INT32));
+      schemaMap.put("root.sg.d2.s2", new MeasurementPath("root.sg.d2.s2", TSDataType.DOUBLE));
+      schemaMap.put("root.sg.d2.s4", new MeasurementPath("root.sg.d2.s4", TSDataType.TEXT));
+
+      MeasurementPath aS1 = new MeasurementPath("root.sg.d2.a.s1", TSDataType.INT32);
+      MeasurementPath aS2 = new MeasurementPath("root.sg.d2.a.s2", TSDataType.DOUBLE);
+      AlignedPath alignedPath =
+          new AlignedPath(
+              "root.sg.d2.a",
+              Arrays.asList("s1", "s2"),
+              Arrays.asList(aS1.getMeasurementSchema(), aS2.getMeasurementSchema()));
+      aS1.setUnderAlignedEntity(true);
+      aS2.setUnderAlignedEntity(true);
+      schemaMap.put("root.sg.d2.a.s1", aS1);
+      schemaMap.put("root.sg.d2.a.s2", aS2);
+      schemaMap.put("root.sg.d2.a", alignedPath);
     } catch (IllegalPathException e) {
       e.printStackTrace();
     }
   }
 
-  /* 0. Simple Query */
+  /* Simple Query */
   static {
-    String sql = "SELECT * FROM root.sg.d1 LIMIT 10 OFFSET 10";
+    String sql = "SELECT ** FROM root.sg.d2 LIMIT 10 OFFSET 10";
 
     List<PlanNode> sourceNodeList = new ArrayList<>();
     sourceNodeList.add(
         new SeriesScanNode(
             new PlanNodeId("0"),
-            schemaMap.get("root.sg.d1.s1"),
-            Sets.newHashSet("s1", "s2"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
             OrderBy.TIMESTAMP_ASC));
     sourceNodeList.add(
         new SeriesScanNode(
             new PlanNodeId("1"),
-            schemaMap.get("root.sg.d1.s2"),
-            Sets.newHashSet("s1", "s2"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
+            OrderBy.TIMESTAMP_ASC));
+    sourceNodeList.add(
+        new SeriesScanNode(
+            new PlanNodeId("2"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s4"),
+            OrderBy.TIMESTAMP_ASC));
+    sourceNodeList.add(
+        new AlignedSeriesScanNode(
+            new PlanNodeId("3"),
+            (AlignedPath) schemaMap.get("root.sg.d2.a"),
             OrderBy.TIMESTAMP_ASC));
     TimeJoinNode timeJoinNode =
-        new TimeJoinNode(new PlanNodeId("2"), OrderBy.TIMESTAMP_ASC, sourceNodeList);
-    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("3"), timeJoinNode, 10);
-    LimitNode limitNode = new LimitNode(new PlanNodeId("4"), offsetNode, 10);
+        new TimeJoinNode(new PlanNodeId("4"), OrderBy.TIMESTAMP_ASC, sourceNodeList);
+    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("5"), timeJoinNode, 10);
+    LimitNode limitNode = new LimitNode(new PlanNodeId("6"), offsetNode, 10);
 
     querySQLs.add(sql);
     sqlToPlanMap.put(sql, limitNode);
   }
 
-  /* 1. Raw Data Query */
+  /* Raw Data Query */
   static {
     String sql =
         "SELECT s1 FROM root.sg.* WHERE time > 100 and s2 > 10 "
@@ -116,25 +144,24 @@ public class QueryLogicalPlanUtil {
     List<PlanNode> sourceNodeList = new ArrayList<>();
     sourceNodeList.add(
         new SeriesScanNode(
-            new PlanNodeId("test_query_0"),
-            schemaMap.get("root.sg.d1.s2"),
-            Sets.newHashSet("s2"),
+            new PlanNodeId("0"),
+            (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
             OrderBy.TIMESTAMP_DESC));
     sourceNodeList.add(
         new SeriesScanNode(
-            new PlanNodeId("test_query_1"),
-            schemaMap.get("root.sg.d2.s1"),
-            Sets.newHashSet("s1", "s2"),
+            new PlanNodeId("1"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
             OrderBy.TIMESTAMP_DESC));
     sourceNodeList.add(
         new SeriesScanNode(
-            new PlanNodeId("test_query_2"),
-            schemaMap.get("root.sg.d2.s2"),
-            Sets.newHashSet("s1", "s2"),
+            new PlanNodeId("2"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
             OrderBy.TIMESTAMP_DESC));
+    sourceNodeList.forEach(
+        planNode -> ((SeriesScanNode) planNode).setTimeFilter(TimeFilter.gt(100)));
 
     TimeJoinNode timeJoinNode =
-        new TimeJoinNode(new PlanNodeId("test_query_3"), OrderBy.TIMESTAMP_DESC, sourceNodeList);
+        new TimeJoinNode(new PlanNodeId("3"), OrderBy.TIMESTAMP_DESC, sourceNodeList);
 
     GreaterThanExpression timeFilter =
         new GreaterThanExpression(
@@ -145,155 +172,282 @@ public class QueryLogicalPlanUtil {
             new ConstantOperand(TSDataType.INT32, "10"));
     GreaterThanExpression valueFilter2 =
         new GreaterThanExpression(
-            new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
+            new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
             new ConstantOperand(TSDataType.INT32, "10"));
-    LogicAndExpression expression =
-        new LogicAndExpression(
-            new LogicAndExpression(timeFilter, valueFilter1),
-            new LogicAndExpression(timeFilter, valueFilter2));
+    LogicAndExpression predicate =
+        new LogicAndExpression(timeFilter, new LogicAndExpression(valueFilter1, valueFilter2));
 
     FilterNode filterNode =
         new FilterNode(
-            new PlanNodeId("test_query_4"),
+            new PlanNodeId("4"),
             timeJoinNode,
-            new Expression[] {
-              new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
-              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
-              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
-            },
-            expression,
+            new Expression[] {new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))},
+            predicate,
             false,
-            ZoneId.systemDefault());
-
-    ProjectNode projectNode =
-        new ProjectNode(
-            new PlanNodeId("test_query_4"), filterNode, Collections.singletonList("root.sg.d2.s1"));
+            ZonedDateTime.now().getOffset());
 
     FilterNullNode filterNullNode =
         new FilterNullNode(
-            new PlanNodeId("test_query_5"),
-            projectNode,
+            new PlanNodeId("5"),
+            filterNode,
             FilterNullPolicy.CONTAINS_NULL,
-            new ArrayList<>());
+            Collections.singletonList(new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))));
 
-    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_6"), filterNullNode, 100);
-    LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_7"), offsetNode, 100);
+    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("6"), filterNullNode, 100);
+    LimitNode limitNode = new LimitNode(new PlanNodeId("7"), offsetNode, 100);
 
-    //    querySQLs.add(sql);
-    //    sqlToPlanMap.put(sql, limitNode);
+    querySQLs.add(sql);
+    sqlToPlanMap.put(sql, limitNode);
   }
 
-  /* 2. Raw Data Query (align by device) */
+  /* Raw Data Query (align by device) */
   static {
     String sql =
         "SELECT * FROM root.sg.* WHERE time > 100 and s1 > 10 "
-            + "ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+            + "ORDER BY TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
 
     List<PlanNode> sourceNodeList1 = new ArrayList<>();
-    List<PlanNode> sourceNodeList2 = new ArrayList<>();
     sourceNodeList1.add(
         new SeriesScanNode(
-            new PlanNodeId("test_query_0"),
-            schemaMap.get("root.sg.d1.s1"),
-            Sets.newHashSet("s1", "s2"),
+            new PlanNodeId("0"),
+            (MeasurementPath) schemaMap.get("root.sg.d1.s3"),
             OrderBy.TIMESTAMP_DESC));
     sourceNodeList1.add(
         new SeriesScanNode(
-            new PlanNodeId("test_query_1"),
-            schemaMap.get("root.sg.d1.s2"),
-            Sets.newHashSet("s1", "s2"),
-            OrderBy.TIMESTAMP_DESC));
-    sourceNodeList2.add(
-        new SeriesScanNode(
-            new PlanNodeId("test_query_2"),
-            schemaMap.get("root.sg.d2.s1"),
-            Sets.newHashSet("s1", "s2"),
+            new PlanNodeId("1"),
+            (MeasurementPath) schemaMap.get("root.sg.d1.s1"),
             OrderBy.TIMESTAMP_DESC));
-    sourceNodeList2.add(
+    sourceNodeList1.add(
         new SeriesScanNode(
-            new PlanNodeId("test_query_3"),
-            schemaMap.get("root.sg.d2.s2"),
-            Sets.newHashSet("s1", "s2"),
+            new PlanNodeId("2"),
+            (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
             OrderBy.TIMESTAMP_DESC));
+    sourceNodeList1.forEach(
+        planNode -> ((SeriesScanNode) planNode).setTimeFilter(TimeFilter.gt(100)));
 
     TimeJoinNode timeJoinNode1 =
-        new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
-    TimeJoinNode timeJoinNode2 =
-        new TimeJoinNode(new PlanNodeId("test_query_5"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+        new TimeJoinNode(new PlanNodeId("3"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
 
     GreaterThanExpression timeFilter =
         new GreaterThanExpression(
             new TimestampOperand(), new ConstantOperand(TSDataType.INT64, "100"));
     GreaterThanExpression valueFilter1 =
         new GreaterThanExpression(
-            new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
-            new ConstantOperand(TSDataType.INT32, "10"));
-    GreaterThanExpression valueFilter2 =
-        new GreaterThanExpression(
-            new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
-            new ConstantOperand(TSDataType.INT32, "10"));
-    LogicAndExpression expression =
-        new LogicAndExpression(
-            new LogicAndExpression(timeFilter, valueFilter1),
-            new LogicAndExpression(timeFilter, valueFilter2));
+            new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
+            new ConstantOperand(TSDataType.INT64, "10"));
+    LogicAndExpression predicate1 = new LogicAndExpression(timeFilter, valueFilter1);
 
     FilterNode filterNode1 =
         new FilterNode(
-            new PlanNodeId("test_query_6"),
+            new PlanNodeId("4"),
             timeJoinNode1,
             new Expression[] {
+              new TimeSeriesOperand(schemaMap.get("root.sg.d1.s3")),
               new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
-              new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
+              new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2"))
             },
-            expression,
+            predicate1,
             false,
-            ZoneId.systemDefault());
+            ZonedDateTime.now().getOffset());
+
+    List<PlanNode> sourceNodeList2 = new ArrayList<>();
+    sourceNodeList2.add(
+        new SeriesScanNode(
+            new PlanNodeId("5"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s4"),
+            OrderBy.TIMESTAMP_DESC));
+    sourceNodeList2.add(
+        new SeriesScanNode(
+            new PlanNodeId("6"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
+            OrderBy.TIMESTAMP_DESC));
+    sourceNodeList2.add(
+        new SeriesScanNode(
+            new PlanNodeId("7"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
+            OrderBy.TIMESTAMP_DESC));
+    sourceNodeList2.forEach(
+        planNode -> ((SeriesScanNode) planNode).setTimeFilter(TimeFilter.gt(100)));
+
+    TimeJoinNode timeJoinNode2 =
+        new TimeJoinNode(new PlanNodeId("8"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+
+    GreaterThanExpression valueFilter2 =
+        new GreaterThanExpression(
+            new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
+            new ConstantOperand(TSDataType.INT32, "10"));
+    LogicAndExpression predicate2 = new LogicAndExpression(timeFilter, valueFilter2);
+
     FilterNode filterNode2 =
         new FilterNode(
-            new PlanNodeId("test_query_7"),
+            new PlanNodeId("9"),
             timeJoinNode2,
             new Expression[] {
+              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s4")),
               new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
-              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
+              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))
             },
-            expression,
+            predicate2,
             false,
-            ZoneId.systemDefault());
+            ZonedDateTime.now().getOffset());
 
+    Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
+    deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 3, 4));
+    deviceToMeasurementIndexesMap.put("root.sg.d2", Arrays.asList(2, 3, 4));
     DeviceViewNode deviceViewNode =
         new DeviceViewNode(
-            new PlanNodeId("test_query_8"),
+            new PlanNodeId("10"),
             Arrays.asList(OrderBy.DEVICE_ASC, OrderBy.TIMESTAMP_DESC),
-            Arrays.asList("s1", "s2"));
+            Arrays.asList(HeaderConstant.COLUMN_DEVICE, "s3", "s4", "s1", "s2"),
+            deviceToMeasurementIndexesMap);
     deviceViewNode.addChildDeviceNode("root.sg.d1", filterNode1);
     deviceViewNode.addChildDeviceNode("root.sg.d2", filterNode2);
 
-    FilterNullNode filterNullNode =
-        new FilterNullNode(
-            new PlanNodeId("test_query_9"),
-            deviceViewNode,
-            FilterNullPolicy.CONTAINS_NULL,
-            new ArrayList<>());
+    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("11"), deviceViewNode, 100);
+    LimitNode limitNode = new LimitNode(new PlanNodeId("12"), offsetNode, 100);
+
+    querySQLs.add(sql);
+    sqlToPlanMap.put(sql, limitNode);
+  }
+
+  /* Simple Aggregation Query */
+  static {
+    String sql =
+        "SELECT last_value(s1), first_value(s1), sum(s2) FROM root.sg.** WHERE time > 100 LIMIT 10 OFFSET 10";
+
+    List<PlanNode> sourceNodeList = new ArrayList<>();
+    Filter timeFilter = TimeFilter.gt(100);
+    sourceNodeList.add(
+        new SeriesAggregationScanNode(
+            new PlanNodeId("0"),
+            (MeasurementPath) schemaMap.get("root.sg.d1.s1"),
+            Collections.singletonList(
+                new AggregationDescriptor(
+                    AggregationType.FIRST_VALUE,
+                    AggregationStep.SINGLE,
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))),
+            OrderBy.TIMESTAMP_ASC,
+            null));
+    sourceNodeList.add(
+        new SeriesAggregationScanNode(
+            new PlanNodeId("1"),
+            (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
+            Collections.singletonList(
+                new AggregationDescriptor(
+                    AggregationType.SUM,
+                    AggregationStep.SINGLE,
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2"))))),
+            OrderBy.TIMESTAMP_ASC,
+            null));
+    sourceNodeList.add(
+        new SeriesAggregationScanNode(
+            new PlanNodeId("2"),
+            (MeasurementPath) schemaMap.get("root.sg.d1.s1"),
+            Collections.singletonList(
+                new AggregationDescriptor(
+                    AggregationType.LAST_VALUE,
+                    AggregationStep.SINGLE,
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))),
+            OrderBy.TIMESTAMP_ASC,
+            null));
+    sourceNodeList.add(
+        new SeriesAggregationScanNode(
+            new PlanNodeId("3"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
+            Collections.singletonList(
+                new AggregationDescriptor(
+                    AggregationType.FIRST_VALUE,
+                    AggregationStep.SINGLE,
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))),
+            OrderBy.TIMESTAMP_ASC,
+            null));
+    sourceNodeList.add(
+        new SeriesAggregationScanNode(
+            new PlanNodeId("4"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
+            Collections.singletonList(
+                new AggregationDescriptor(
+                    AggregationType.SUM,
+                    AggregationStep.SINGLE,
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))),
+            OrderBy.TIMESTAMP_ASC,
+            null));
+    sourceNodeList.add(
+        new SeriesAggregationScanNode(
+            new PlanNodeId("5"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
+            Collections.singletonList(
+                new AggregationDescriptor(
+                    AggregationType.LAST_VALUE,
+                    AggregationStep.SINGLE,
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))),
+            OrderBy.TIMESTAMP_ASC,
+            null));
+    sourceNodeList.add(
+        new AlignedSeriesAggregationScanNode(
+            new PlanNodeId("6"),
+            (AlignedPath) schemaMap.get("root.sg.d2.a"),
+            Arrays.asList(
+                new AggregationDescriptor(
+                    AggregationType.FIRST_VALUE,
+                    AggregationStep.SINGLE,
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1")))),
+                new AggregationDescriptor(
+                    AggregationType.SUM,
+                    AggregationStep.SINGLE,
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s2"))))),
+            OrderBy.TIMESTAMP_ASC,
+            null));
+    sourceNodeList.add(
+        new AlignedSeriesAggregationScanNode(
+            new PlanNodeId("7"),
+            new AlignedPath((MeasurementPath) schemaMap.get("root.sg.d2.a.s1")),
+            Collections.singletonList(
+                new AggregationDescriptor(
+                    AggregationType.LAST_VALUE,
+                    AggregationStep.SINGLE,
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1"))))),
+            OrderBy.TIMESTAMP_ASC,
+            null));
+    sourceNodeList.forEach(
+        node -> {
+          if (node instanceof SeriesAggregationScanNode) {
+            ((SeriesAggregationScanNode) node).setTimeFilter(timeFilter);
+          } else {
+            ((AlignedSeriesAggregationScanNode) node).setTimeFilter(timeFilter);
+          }
+        });
 
-    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_10"), filterNullNode, 100);
-    LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_11"), offsetNode, 100);
+    TimeJoinNode timeJoinNode =
+        new TimeJoinNode(new PlanNodeId("8"), OrderBy.TIMESTAMP_ASC, sourceNodeList);
+    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("9"), timeJoinNode, 10);
+    LimitNode limitNode = new LimitNode(new PlanNodeId("10"), offsetNode, 10);
 
-    //    querySQLs.add(sql);
-    //    sqlToPlanMap.put(sql, limitNode);
+    querySQLs.add(sql);
+    sqlToPlanMap.put(sql, limitNode);
   }
 
-  /* 3. Aggregation Query (without value filter) */
+  /* Aggregation Query (without value filter) */
   static {
     String sql =
-        "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 "
-            + "GROUP BY LEVEL = 1 ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100";
+        "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.** WHERE time > 100 "
+            + "GROUP BY LEVEL = 1 ORDER BY TIME DESC LIMIT 100 OFFSET 100";
 
     List<PlanNode> sourceNodeList = new ArrayList<>();
     Filter timeFilter = TimeFilter.gt(100);
     sourceNodeList.add(
         new SeriesAggregationScanNode(
-            new PlanNodeId("test_query_0"),
-            schemaMap.get("root.sg.d1.s1"),
+            new PlanNodeId("0"),
+            (MeasurementPath) schemaMap.get("root.sg.d1.s1"),
             Arrays.asList(
                 new AggregationDescriptor(
                     AggregationType.COUNT,
@@ -305,24 +459,24 @@ public class QueryLogicalPlanUtil {
                     AggregationStep.PARTIAL,
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))),
-            Sets.newHashSet("s1", "s2"),
-            OrderBy.TIMESTAMP_DESC));
+            OrderBy.TIMESTAMP_DESC,
+            null));
     sourceNodeList.add(
         new SeriesAggregationScanNode(
-            new PlanNodeId("test_query_1"),
-            schemaMap.get("root.sg.d1.s2"),
+            new PlanNodeId("1"),
+            (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
             Collections.singletonList(
                 new AggregationDescriptor(
                     AggregationType.MAX_VALUE,
                     AggregationStep.PARTIAL,
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2"))))),
-            Sets.newHashSet("s1", "s2"),
-            OrderBy.TIMESTAMP_DESC));
+            OrderBy.TIMESTAMP_DESC,
+            null));
     sourceNodeList.add(
         new SeriesAggregationScanNode(
-            new PlanNodeId("test_query_0"),
-            schemaMap.get("root.sg.d2.s1"),
+            new PlanNodeId("2"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
             Arrays.asList(
                 new AggregationDescriptor(
                     AggregationType.COUNT,
@@ -334,193 +488,269 @@ public class QueryLogicalPlanUtil {
                     AggregationStep.PARTIAL,
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))),
-            Sets.newHashSet("s1", "s2"),
-            OrderBy.TIMESTAMP_DESC));
+            OrderBy.TIMESTAMP_DESC,
+            null));
     sourceNodeList.add(
         new SeriesAggregationScanNode(
-            new PlanNodeId("test_query_1"),
-            schemaMap.get("root.sg.d2.s2"),
+            new PlanNodeId("3"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
             Collections.singletonList(
                 new AggregationDescriptor(
                     AggregationType.MAX_VALUE,
                     AggregationStep.PARTIAL,
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))),
-            Sets.newHashSet("s1", "s2"),
-            OrderBy.TIMESTAMP_DESC));
-    sourceNodeList.forEach(node -> ((SeriesAggregationScanNode) node).setTimeFilter(timeFilter));
+            OrderBy.TIMESTAMP_DESC,
+            null));
+    sourceNodeList.add(
+        new AlignedSeriesAggregationScanNode(
+            new PlanNodeId("4"),
+            (AlignedPath) schemaMap.get("root.sg.d2.a"),
+            Arrays.asList(
+                new AggregationDescriptor(
+                    AggregationType.LAST_VALUE,
+                    AggregationStep.PARTIAL,
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1")))),
+                new AggregationDescriptor(
+                    AggregationType.COUNT,
+                    AggregationStep.PARTIAL,
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1")))),
+                new AggregationDescriptor(
+                    AggregationType.MAX_VALUE,
+                    AggregationStep.PARTIAL,
+                    Collections.singletonList(
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s2"))))),
+            OrderBy.TIMESTAMP_DESC,
+            null));
+    sourceNodeList.forEach(
+        node -> {
+          if (node instanceof SeriesAggregationScanNode) {
+            ((SeriesAggregationScanNode) node).setTimeFilter(timeFilter);
+          } else {
+            ((AlignedSeriesAggregationScanNode) node).setTimeFilter(timeFilter);
+          }
+        });
 
     GroupByLevelNode groupByLevelNode =
         new GroupByLevelNode(
-            new PlanNodeId("test_query_5"),
+            new PlanNodeId("5"),
             sourceNodeList,
             Arrays.asList(
                 new AggregationDescriptor(
                     AggregationType.COUNT,
                     AggregationStep.FINAL,
                     Arrays.asList(
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+                        new FunctionExpression(
+                            SQLConstant.COUNT,
+                            new LinkedHashMap<>(),
+                            Collections.singletonList(
+                                new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+                        new FunctionExpression(
+                            SQLConstant.COUNT,
+                            new LinkedHashMap<>(),
+                            Collections.singletonList(
+                                new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))))),
+                new AggregationDescriptor(
+                    AggregationType.COUNT,
+                    AggregationStep.FINAL,
+                    Collections.singletonList(
+                        new FunctionExpression(
+                            SQLConstant.COUNT,
+                            new LinkedHashMap<>(),
+                            Collections.singletonList(
+                                new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1")))))),
                 new AggregationDescriptor(
                     AggregationType.MAX_VALUE,
                     AggregationStep.FINAL,
                     Arrays.asList(
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")))),
+                        new FunctionExpression(
+                            SQLConstant.MAX_VALUE,
+                            new LinkedHashMap<>(),
+                            Collections.singletonList(
+                                new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")))),
+                        new FunctionExpression(
+                            SQLConstant.MAX_VALUE,
+                            new LinkedHashMap<>(),
+                            Collections.singletonList(
+                                new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")))))),
+                new AggregationDescriptor(
+                    AggregationType.MAX_VALUE,
+                    AggregationStep.FINAL,
+                    Collections.singletonList(
+                        new FunctionExpression(
+                            SQLConstant.MAX_VALUE,
+                            new LinkedHashMap<>(),
+                            Collections.singletonList(
+                                new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s2")))))),
                 new AggregationDescriptor(
                     AggregationType.LAST_VALUE,
                     AggregationStep.FINAL,
                     Arrays.asList(
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))),
+                        new FunctionExpression(
+                            SQLConstant.LAST_VALUE,
+                            new LinkedHashMap<>(),
+                            Collections.singletonList(
+                                new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+                        new FunctionExpression(
+                            SQLConstant.LAST_VALUE,
+                            new LinkedHashMap<>(),
+                            Collections.singletonList(
+                                new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))))),
+                new AggregationDescriptor(
+                    AggregationType.LAST_VALUE,
+                    AggregationStep.FINAL,
+                    Collections.singletonList(
+                        new FunctionExpression(
+                            SQLConstant.LAST_VALUE,
+                            new LinkedHashMap<>(),
+                            Collections.singletonList(
+                                new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1"))))))),
             Arrays.asList(
-                "count(root.sg.*.s1)", "max_value(root.sg.*.s2)", "last_value(root.sg.*.s1)"));
+                "count(root.sg.*.s1)",
+                "count(root.sg.*.*.s1)",
+                "max_value(root.sg.*.s2)",
+                "max_value(root.sg.*.*.s2)",
+                "last_value(root.sg.*.s1)",
+                "last_value(root.sg.*.*.s1)"));
 
-    FilterNullNode filterNullNode =
-        new FilterNullNode(
-            new PlanNodeId("test_query_6"),
-            groupByLevelNode,
-            FilterNullPolicy.CONTAINS_NULL,
-            new ArrayList<>());
+    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("6"), groupByLevelNode, 100);
+    LimitNode limitNode = new LimitNode(new PlanNodeId("7"), offsetNode, 100);
 
-    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_7"), filterNullNode, 100);
-    LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_8"), offsetNode, 100);
-
-    //    querySQLs.add(sql);
-    //    sqlToPlanMap.put(sql, limitNode);
+    querySQLs.add(sql);
+    sqlToPlanMap.put(sql, limitNode);
   }
 
-  /* 4. Aggregation Query (without value filter and align by device) */
+  /* Aggregation Query (without value filter and align by device) */
   static {
     String sql =
         "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 "
-            + "ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+            + "ORDER BY TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
 
-    List<PlanNode> sourceNodeList1 = new ArrayList<>();
-    List<PlanNode> sourceNodeList2 = new ArrayList<>();
     Filter timeFilter = TimeFilter.gt(100);
+    List<PlanNode> sourceNodeList1 = new ArrayList<>();
     sourceNodeList1.add(
         new SeriesAggregationScanNode(
-            new PlanNodeId("test_query_0"),
-            schemaMap.get("root.sg.d1.s1"),
+            new PlanNodeId("0"),
+            (MeasurementPath) schemaMap.get("root.sg.d1.s1"),
             Arrays.asList(
                 new AggregationDescriptor(
                     AggregationType.COUNT,
-                    AggregationStep.PARTIAL,
+                    AggregationStep.SINGLE,
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))),
                 new AggregationDescriptor(
                     AggregationType.LAST_VALUE,
-                    AggregationStep.PARTIAL,
+                    AggregationStep.SINGLE,
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))),
-            Sets.newHashSet("s1", "s2"),
-            OrderBy.TIMESTAMP_DESC));
+            OrderBy.TIMESTAMP_DESC,
+            null));
     sourceNodeList1.add(
         new SeriesAggregationScanNode(
-            new PlanNodeId("test_query_1"),
-            schemaMap.get("root.sg.d1.s2"),
+            new PlanNodeId("1"),
+            (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
             Collections.singletonList(
                 new AggregationDescriptor(
                     AggregationType.MAX_VALUE,
-                    AggregationStep.PARTIAL,
+                    AggregationStep.SINGLE,
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2"))))),
-            Sets.newHashSet("s1", "s2"),
-            OrderBy.TIMESTAMP_DESC));
+            OrderBy.TIMESTAMP_DESC,
+            null));
+    sourceNodeList1.forEach(node -> ((SeriesAggregationScanNode) node).setTimeFilter(timeFilter));
+
+    TimeJoinNode timeJoinNode1 =
+        new TimeJoinNode(new PlanNodeId("2"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
+
+    List<PlanNode> sourceNodeList2 = new ArrayList<>();
     sourceNodeList2.add(
         new SeriesAggregationScanNode(
-            new PlanNodeId("test_query_0"),
-            schemaMap.get("root.sg.d2.s1"),
+            new PlanNodeId("3"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
             Arrays.asList(
                 new AggregationDescriptor(
                     AggregationType.COUNT,
-                    AggregationStep.PARTIAL,
+                    AggregationStep.SINGLE,
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
                 new AggregationDescriptor(
                     AggregationType.LAST_VALUE,
-                    AggregationStep.PARTIAL,
+                    AggregationStep.SINGLE,
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))),
-            Sets.newHashSet("s1", "s2"),
-            OrderBy.TIMESTAMP_DESC));
+            OrderBy.TIMESTAMP_DESC,
+            null));
     sourceNodeList2.add(
         new SeriesAggregationScanNode(
-            new PlanNodeId("test_query_1"),
-            schemaMap.get("root.sg.d2.s2"),
+            new PlanNodeId("4"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
             Collections.singletonList(
                 new AggregationDescriptor(
                     AggregationType.MAX_VALUE,
-                    AggregationStep.PARTIAL,
+                    AggregationStep.SINGLE,
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))),
-            Sets.newHashSet("s1", "s2"),
-            OrderBy.TIMESTAMP_DESC));
-    sourceNodeList1.forEach(node -> ((SeriesAggregationScanNode) node).setTimeFilter(timeFilter));
+            OrderBy.TIMESTAMP_DESC,
+            null));
     sourceNodeList2.forEach(node -> ((SeriesAggregationScanNode) node).setTimeFilter(timeFilter));
 
-    TimeJoinNode timeJoinNode1 =
-        new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
     TimeJoinNode timeJoinNode2 =
-        new TimeJoinNode(new PlanNodeId("test_query_5"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+        new TimeJoinNode(new PlanNodeId("5"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
 
+    Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
+    deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2, 3));
+    deviceToMeasurementIndexesMap.put("root.sg.d2", Arrays.asList(1, 2, 3));
     DeviceViewNode deviceViewNode =
         new DeviceViewNode(
-            new PlanNodeId("test_query_6"),
+            new PlanNodeId("6"),
             Arrays.asList(OrderBy.DEVICE_ASC, OrderBy.TIMESTAMP_DESC),
-            Arrays.asList("count(s1)", "max_value(s2)", "last_value(s1)"));
+            Arrays.asList(
+                HeaderConstant.COLUMN_DEVICE, "count(s1)", "max_value(s2)", "last_value(s1)"),
+            deviceToMeasurementIndexesMap);
     deviceViewNode.addChildDeviceNode("root.sg.d1", timeJoinNode1);
     deviceViewNode.addChildDeviceNode("root.sg.d2", timeJoinNode2);
 
-    FilterNullNode filterNullNode =
-        new FilterNullNode(
-            new PlanNodeId("test_query_7"),
-            deviceViewNode,
-            FilterNullPolicy.CONTAINS_NULL,
-            new ArrayList<>());
-
-    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_8"), filterNullNode, 100);
-    LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_9"), offsetNode, 100);
+    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("7"), deviceViewNode, 100);
+    LimitNode limitNode = new LimitNode(new PlanNodeId("8"), offsetNode, 100);
 
-    //    querySQLs.add(sql);
-    //    sqlToPlanMap.put(sql, limitNode);
+    querySQLs.add(sql);
+    sqlToPlanMap.put(sql, limitNode);
   }
 
-  /* 5. Aggregation Query (with value filter) */
+  /* Aggregation Query (with value filter) */
   static {
     String sql =
         "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 and s2 > 10 "
-            + "GROUP BY LEVEL = 1 ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100";
+            + "GROUP BY LEVEL = 1 ORDER BY TIME DESC LIMIT 100 OFFSET 100";
 
     List<PlanNode> sourceNodeList = new ArrayList<>();
     sourceNodeList.add(
         new SeriesScanNode(
-            new PlanNodeId("test_query_0"),
-            schemaMap.get("root.sg.d1.s1"),
-            Sets.newHashSet("s1", "s2"),
+            new PlanNodeId("0"),
+            (MeasurementPath) schemaMap.get("root.sg.d1.s1"),
             OrderBy.TIMESTAMP_DESC));
     sourceNodeList.add(
         new SeriesScanNode(
-            new PlanNodeId("test_query_1"),
-            schemaMap.get("root.sg.d1.s2"),
-            Sets.newHashSet("s1", "s2"),
+            new PlanNodeId("1"),
+            (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
             OrderBy.TIMESTAMP_DESC));
     sourceNodeList.add(
         new SeriesScanNode(
-            new PlanNodeId("test_query_2"),
-            schemaMap.get("root.sg.d2.s1"),
-            Sets.newHashSet("s1", "s2"),
+            new PlanNodeId("2"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
             OrderBy.TIMESTAMP_DESC));
     sourceNodeList.add(
         new SeriesScanNode(
-            new PlanNodeId("test_query_3"),
-            schemaMap.get("root.sg.d2.s2"),
-            Sets.newHashSet("s1", "s2"),
+            new PlanNodeId("3"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
             OrderBy.TIMESTAMP_DESC));
+    sourceNodeList.forEach(
+        planNode -> ((SeriesScanNode) planNode).setTimeFilter(TimeFilter.gt(100)));
 
     TimeJoinNode timeJoinNode =
-        new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList);
+        new TimeJoinNode(new PlanNodeId("4"), OrderBy.TIMESTAMP_DESC, sourceNodeList);
 
     GreaterThanExpression timeFilter =
         new GreaterThanExpression(
@@ -533,27 +763,25 @@ public class QueryLogicalPlanUtil {
         new GreaterThanExpression(
             new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
             new ConstantOperand(TSDataType.INT32, "10"));
-    LogicAndExpression expression =
-        new LogicAndExpression(
-            new LogicAndExpression(timeFilter, valueFilter1),
-            new LogicAndExpression(timeFilter, valueFilter2));
+    LogicAndExpression predicate =
+        new LogicAndExpression(timeFilter, new LogicAndExpression(valueFilter1, valueFilter2));
     FilterNode filterNode =
         new FilterNode(
-            new PlanNodeId("test_query_5"),
+            new PlanNodeId("5"),
             timeJoinNode,
             new Expression[] {
-              new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
-              new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
               new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
+              new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
               new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
+              new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
             },
-            expression,
+            predicate,
             false,
-            ZoneId.systemDefault());
+            ZonedDateTime.now().getOffset());
 
     AggregationNode aggregationNode =
         new AggregationNode(
-            new PlanNodeId("test_query_6"),
+            new PlanNodeId("6"),
             Collections.singletonList(filterNode),
             Arrays.asList(
                 new AggregationDescriptor(
@@ -562,108 +790,112 @@ public class QueryLogicalPlanUtil {
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))),
                 new AggregationDescriptor(
-                    AggregationType.LAST_VALUE,
+                    AggregationType.MAX_VALUE,
                     AggregationStep.PARTIAL,
                     Collections.singletonList(
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))),
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")))),
                 new AggregationDescriptor(
-                    AggregationType.MAX_VALUE,
+                    AggregationType.LAST_VALUE,
                     AggregationStep.PARTIAL,
                     Collections.singletonList(
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")))),
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))),
                 new AggregationDescriptor(
                     AggregationType.COUNT,
                     AggregationStep.PARTIAL,
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
                 new AggregationDescriptor(
-                    AggregationType.LAST_VALUE,
+                    AggregationType.MAX_VALUE,
                     AggregationStep.PARTIAL,
                     Collections.singletonList(
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")))),
                 new AggregationDescriptor(
-                    AggregationType.MAX_VALUE,
+                    AggregationType.LAST_VALUE,
                     AggregationStep.PARTIAL,
                     Collections.singletonList(
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))));
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))));
 
     GroupByLevelNode groupByLevelNode =
         new GroupByLevelNode(
-            new PlanNodeId("test_query_7"),
+            new PlanNodeId("7"),
             Collections.singletonList(aggregationNode),
             Arrays.asList(
                 new AggregationDescriptor(
                     AggregationType.COUNT,
                     AggregationStep.FINAL,
                     Arrays.asList(
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+                        new FunctionExpression(
+                            SQLConstant.COUNT,
+                            new LinkedHashMap<>(),
+                            Collections.singletonList(
+                                new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+                        new FunctionExpression(
+                            SQLConstant.COUNT,
+                            new LinkedHashMap<>(),
+                            Collections.singletonList(
+                                new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))))),
                 new AggregationDescriptor(
-                    AggregationType.LAST_VALUE,
+                    AggregationType.MAX_VALUE,
                     AggregationStep.FINAL,
                     Arrays.asList(
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+                        new FunctionExpression(
+                            SQLConstant.MAX_VALUE,
+                            new LinkedHashMap<>(),
+                            Collections.singletonList(
+                                new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")))),
+                        new FunctionExpression(
+                            SQLConstant.MAX_VALUE,
+                            new LinkedHashMap<>(),
+                            Collections.singletonList(
+                                new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")))))),
                 new AggregationDescriptor(
-                    AggregationType.MAX_VALUE,
+                    AggregationType.LAST_VALUE,
                     AggregationStep.FINAL,
                     Arrays.asList(
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))),
+                        new FunctionExpression(
+                            SQLConstant.LAST_VALUE,
+                            new LinkedHashMap<>(),
+                            Collections.singletonList(
+                                new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+                        new FunctionExpression(
+                            SQLConstant.LAST_VALUE,
+                            new LinkedHashMap<>(),
+                            Collections.singletonList(
+                                new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))))),
             Arrays.asList(
                 "count(root.sg.*.s1)", "max_value(root.sg.*.s2)", "last_value(root.sg.*.s1)"));
 
-    FilterNullNode filterNullNode =
-        new FilterNullNode(
-            new PlanNodeId("test_query_8"),
-            groupByLevelNode,
-            FilterNullPolicy.CONTAINS_NULL,
-            new ArrayList<>());
+    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("8"), groupByLevelNode, 100);
+    LimitNode limitNode = new LimitNode(new PlanNodeId("9"), offsetNode, 100);
 
-    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_9"), filterNullNode, 100);
-    LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_10"), offsetNode, 100);
-
-    //    querySQLs.add(sql);
-    //    sqlToPlanMap.put(sql, limitNode);
+    querySQLs.add(sql);
+    sqlToPlanMap.put(sql, limitNode);
   }
 
-  /* 6. Aggregation Query (with value filter and align by device) */
+  /* Aggregation Query (with value filter and align by device) */
   static {
     String sql =
         "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 and s2 > 10 "
-            + "ORDER BY TIME DESC WITHOUT NULL ANY LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+            + "ORDER BY TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
 
     List<PlanNode> sourceNodeList1 = new ArrayList<>();
-    List<PlanNode> sourceNodeList2 = new ArrayList<>();
     sourceNodeList1.add(
         new SeriesScanNode(
-            new PlanNodeId("test_query_0"),
-            schemaMap.get("root.sg.d1.s1"),
-            Sets.newHashSet("s1", "s2"),
+            new PlanNodeId("0"),
+            (MeasurementPath) schemaMap.get("root.sg.d1.s1"),
             OrderBy.TIMESTAMP_DESC));
     sourceNodeList1.add(
         new SeriesScanNode(
-            new PlanNodeId("test_query_1"),
-            schemaMap.get("root.sg.d1.s2"),
-            Sets.newHashSet("s1", "s2"),
-            OrderBy.TIMESTAMP_DESC));
-    sourceNodeList2.add(
-        new SeriesScanNode(
-            new PlanNodeId("test_query_2"),
-            schemaMap.get("root.sg.d2.s1"),
-            Sets.newHashSet("s1", "s2"),
-            OrderBy.TIMESTAMP_DESC));
-    sourceNodeList2.add(
-        new SeriesScanNode(
-            new PlanNodeId("test_query_3"),
-            schemaMap.get("root.sg.d2.s2"),
-            Sets.newHashSet("s1", "s2"),
+            new PlanNodeId("1"),
+            (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
             OrderBy.TIMESTAMP_DESC));
+    sourceNodeList1.forEach(
+        planNode -> {
+          ((SeriesScanNode) planNode).setTimeFilter(TimeFilter.gt(100));
+        });
 
     TimeJoinNode timeJoinNode1 =
-        new TimeJoinNode(new PlanNodeId("test_query_4"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
-    TimeJoinNode timeJoinNode2 =
-        new TimeJoinNode(new PlanNodeId("test_query_5"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+        new TimeJoinNode(new PlanNodeId("2"), OrderBy.TIMESTAMP_DESC, sourceNodeList1);
 
     GreaterThanExpression timeFilter =
         new GreaterThanExpression(
@@ -671,99 +903,117 @@ public class QueryLogicalPlanUtil {
     GreaterThanExpression valueFilter1 =
         new GreaterThanExpression(
             new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
-            new ConstantOperand(TSDataType.INT32, "10"));
-    GreaterThanExpression valueFilter2 =
-        new GreaterThanExpression(
-            new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
-            new ConstantOperand(TSDataType.INT32, "10"));
-    LogicAndExpression expression =
-        new LogicAndExpression(
-            new LogicAndExpression(timeFilter, valueFilter1),
-            new LogicAndExpression(timeFilter, valueFilter2));
+            new ConstantOperand(TSDataType.INT64, "10"));
+    LogicAndExpression predicate1 = new LogicAndExpression(timeFilter, valueFilter1);
 
     FilterNode filterNode1 =
         new FilterNode(
-            new PlanNodeId("test_query_6"),
+            new PlanNodeId("3"),
             timeJoinNode1,
             new Expression[] {
               new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
               new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
             },
-            expression,
-            false,
-            ZoneId.systemDefault());
-    FilterNode filterNode2 =
-        new FilterNode(
-            new PlanNodeId("test_query_7"),
-            timeJoinNode2,
-            new Expression[] {
-              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
-              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
-            },
-            expression,
+            predicate1,
             false,
-            ZoneId.systemDefault());
+            ZonedDateTime.now().getOffset());
 
     AggregationNode aggregationNode1 =
         new AggregationNode(
-            new PlanNodeId("test_query_8"),
+            new PlanNodeId("4"),
             Collections.singletonList(filterNode1),
             Arrays.asList(
                 new AggregationDescriptor(
                     AggregationType.COUNT,
-                    AggregationStep.PARTIAL,
+                    AggregationStep.FINAL,
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))),
                 new AggregationDescriptor(
-                    AggregationType.LAST_VALUE,
-                    AggregationStep.PARTIAL,
+                    AggregationType.MAX_VALUE,
+                    AggregationStep.FINAL,
                     Collections.singletonList(
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")))),
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")))),
                 new AggregationDescriptor(
-                    AggregationType.MAX_VALUE,
-                    AggregationStep.PARTIAL,
+                    AggregationType.LAST_VALUE,
+                    AggregationStep.FINAL,
                     Collections.singletonList(
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2"))))));
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))));
+
+    List<PlanNode> sourceNodeList2 = new ArrayList<>();
+    sourceNodeList2.add(
+        new SeriesScanNode(
+            new PlanNodeId("5"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
+            OrderBy.TIMESTAMP_DESC));
+    sourceNodeList2.add(
+        new SeriesScanNode(
+            new PlanNodeId("6"),
+            (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
+            OrderBy.TIMESTAMP_DESC));
+    sourceNodeList2.forEach(
+        planNode -> {
+          ((SeriesScanNode) planNode).setTimeFilter(TimeFilter.gt(100));
+        });
+
+    TimeJoinNode timeJoinNode2 =
+        new TimeJoinNode(new PlanNodeId("7"), OrderBy.TIMESTAMP_DESC, sourceNodeList2);
+
+    GreaterThanExpression valueFilter2 =
+        new GreaterThanExpression(
+            new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
+            new ConstantOperand(TSDataType.INT32, "10"));
+    LogicAndExpression predicate2 = new LogicAndExpression(timeFilter, valueFilter2);
+
+    FilterNode filterNode2 =
+        new FilterNode(
+            new PlanNodeId("8"),
+            timeJoinNode2,
+            new Expression[] {
+              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
+              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
+            },
+            predicate2,
+            false,
+            ZonedDateTime.now().getOffset());
+
     AggregationNode aggregationNode2 =
         new AggregationNode(
-            new PlanNodeId("test_query_9"),
+            new PlanNodeId("9"),
             Collections.singletonList(filterNode2),
             Arrays.asList(
                 new AggregationDescriptor(
                     AggregationType.COUNT,
-                    AggregationStep.PARTIAL,
+                    AggregationStep.FINAL,
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
                 new AggregationDescriptor(
-                    AggregationType.LAST_VALUE,
-                    AggregationStep.PARTIAL,
+                    AggregationType.MAX_VALUE,
+                    AggregationStep.FINAL,
                     Collections.singletonList(
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")))),
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")))),
                 new AggregationDescriptor(
-                    AggregationType.MAX_VALUE,
-                    AggregationStep.PARTIAL,
+                    AggregationType.LAST_VALUE,
+                    AggregationStep.FINAL,
                     Collections.singletonList(
-                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))));
+                        new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))));
 
+    Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
+    deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2, 3));
+    deviceToMeasurementIndexesMap.put("root.sg.d2", Arrays.asList(1, 2, 3));
     DeviceViewNode deviceViewNode =
         new DeviceViewNode(
-            new PlanNodeId("test_query_10"),
+            new PlanNodeId("10"),
             Arrays.asList(OrderBy.DEVICE_ASC, OrderBy.TIMESTAMP_DESC),
-            Arrays.asList("count(s1)", "max_value(s2)", "last_value(s1)"));
+            Arrays.asList(
+                HeaderConstant.COLUMN_DEVICE, "count(s1)", "max_value(s2)", "last_value(s1)"),
+            deviceToMeasurementIndexesMap);
     deviceViewNode.addChildDeviceNode("root.sg.d1", aggregationNode1);
     deviceViewNode.addChildDeviceNode("root.sg.d2", aggregationNode2);
 
-    FilterNullNode filterNullNode =
-        new FilterNullNode(
-            new PlanNodeId("test_query_11"),
-            deviceViewNode,
-            FilterNullPolicy.CONTAINS_NULL,
-            new ArrayList<>());
-
-    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("test_query_12"), filterNullNode, 100);
-    LimitNode limitNode = new LimitNode(new PlanNodeId("test_query_13"), offsetNode, 100);
+    OffsetNode offsetNode = new OffsetNode(new PlanNodeId("11"), deviceViewNode, 100);
+    LimitNode limitNode = new LimitNode(new PlanNodeId("12"), offsetNode, 100);
 
-    //    querySQLs.add(sql);
-    //    sqlToPlanMap.put(sql, limitNode);
+    querySQLs.add(sql);
+    sqlToPlanMap.put(sql, limitNode);
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/AggregationNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/AggregationNodeSerdeTest.java
index e93fcf1fb4..e1ead56759 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/AggregationNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/AggregationNodeSerdeTest.java
@@ -63,7 +63,6 @@ public class AggregationNodeSerdeTest {
                     AggregationStep.INTERMEDIATE,
                     Collections.singletonList(
                         new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"))))),
-            Sets.newHashSet("s1"),
             OrderBy.TIMESTAMP_ASC,
             new In<>(Sets.newHashSet("s1", "s2"), VALUE_FILTER, true),
             groupByTimeParameter,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/DeviceViewNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/DeviceViewNodeSerdeTest.java
index d2fa325a0c..b8ed4919fa 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/DeviceViewNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/DeviceViewNodeSerdeTest.java
@@ -29,6 +29,7 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
 
@@ -43,7 +44,8 @@ public class DeviceViewNodeSerdeTest {
         new DeviceViewNode(
             new PlanNodeId("TestDeviceMergeNode"),
             Arrays.asList(OrderBy.DEVICE_ASC, OrderBy.TIMESTAMP_ASC),
-            Arrays.asList("s1", "s2"));
+            Arrays.asList("s1", "s2"),
+            new HashMap<>());
     deviceViewNode.addChildDeviceNode("root.sg.d1", timeJoinNode1);
     deviceViewNode.addChildDeviceNode("root.sg.d2", timeJoinNode2);
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java
index 931183c13a..7e94ffff86 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java
@@ -36,7 +36,6 @@ import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
-import org.apache.commons.compress.utils.Sets;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -62,7 +61,6 @@ public class GroupByLevelNodeSerdeTest {
                     AggregationStep.FINAL,
                     Collections.singletonList(
                         new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"))))),
-            Sets.newHashSet("s1"),
             OrderBy.TIMESTAMP_ASC,
             null,
             groupByTimeParameter,
@@ -78,7 +76,6 @@ public class GroupByLevelNodeSerdeTest {
                     AggregationStep.FINAL,
                     Collections.singletonList(
                         new TimeSeriesOperand(new PartialPath("root.sg.d2.s1"))))),
-            Sets.newHashSet("s1"),
             OrderBy.TIMESTAMP_ASC,
             null,
             groupByTimeParameter,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/LimitNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/LimitNodeSerdeTest.java
index bf263094e3..42348c2911 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/LimitNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/LimitNodeSerdeTest.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
 
-import com.google.common.collect.Sets;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -48,7 +47,6 @@ public class LimitNodeSerdeTest {
         new SeriesScanNode(
             new PlanNodeId("TestSeriesScanNode"),
             new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            Sets.newHashSet("s1"),
             OrderBy.TIMESTAMP_DESC,
             new GroupByFilter(1, 2, 3, 4),
             null,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/OffsetNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/OffsetNodeSerdeTest.java
index 7b506f5946..0f410299d8 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/OffsetNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/OffsetNodeSerdeTest.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
 
-import com.google.common.collect.Sets;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -47,7 +46,6 @@ public class OffsetNodeSerdeTest {
         new SeriesScanNode(
             new PlanNodeId("TestSeriesScanNode"),
             new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            Sets.newHashSet("s1"),
             OrderBy.TIMESTAMP_DESC,
             new GroupByFilter(1, 2, 3, 4),
             null,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java
index 76ec69aaae..66b4943736 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
 
-import com.google.common.collect.Sets;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -47,7 +46,6 @@ public class SortNodeSerdeTest {
         new SeriesScanNode(
             new PlanNodeId("TestSeriesScanNode"),
             new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            Sets.newHashSet("s1"),
             OrderBy.TIMESTAMP_DESC,
             new GroupByFilter(1, 2, 3, 4),
             null,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/TimeJoinNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/TimeJoinNodeSerdeTest.java
index 82e5283ccf..96224b8873 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/TimeJoinNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/TimeJoinNodeSerdeTest.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.ValueFilter;
 
-import com.google.common.collect.Sets;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -47,7 +46,6 @@ public class TimeJoinNodeSerdeTest {
         new SeriesScanNode(
             new PlanNodeId("TestSeriesScanNode"),
             new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            Sets.newHashSet("s1", "s2"),
             OrderBy.TIMESTAMP_DESC,
             TimeFilter.gt(100),
             null,
@@ -59,7 +57,6 @@ public class TimeJoinNodeSerdeTest {
         new SeriesScanNode(
             new PlanNodeId("TestSeriesScanNode"),
             new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
-            Sets.newHashSet("s1", "s2"),
             OrderBy.TIMESTAMP_DESC,
             null,
             ValueFilter.gt(100),
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesAggregationScanNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesAggregationScanNodeSerdeTest.java
index 1ea22b4e2f..072cf1a0a4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesAggregationScanNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesAggregationScanNodeSerdeTest.java
@@ -64,7 +64,6 @@ public class SeriesAggregationScanNodeSerdeTest {
             new PlanNodeId("TestSeriesAggregateScanNode"),
             new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN),
             aggregationDescriptorList,
-            Sets.newHashSet("s1"),
             OrderBy.TIMESTAMP_ASC,
             new In<>(Sets.newHashSet("s1", "s2"), VALUE_FILTER, true),
             groupByTimeParameter,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesScanNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesScanNodeSerdeTest.java
index a40fa7c997..0ce1190d11 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesScanNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/source/SeriesScanNodeSerdeTest.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
 
-import com.google.common.collect.Sets;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -47,7 +46,6 @@ public class SeriesScanNodeSerdeTest {
         new SeriesScanNode(
             new PlanNodeId("TestSeriesScanNode"),
             new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            Sets.newHashSet("s1"),
             OrderBy.TIMESTAMP_DESC,
             new GroupByFilter(1, 2, 3, 4),
             null,