You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2024/04/23 16:09:23 UTC

(iotdb) 01/03: finish

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggregationPushDown
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit edb5a4950ad3a163cfe378b0cb1a5aef2c5526b8
Author: liuminghui233 <54...@qq.com>
AuthorDate: Tue Apr 23 23:56:10 2024 +0800

    finish
---
 .../plan/optimization/AggregationPushDown.java     | 303 ++++++++++++++++
 .../plan/planner/LogicalPlanBuilder.java           | 397 +--------------------
 .../plan/planner/LogicalPlanVisitor.java           | 181 +++-------
 .../queryengine/plan/planner/LogicalPlanner.java   |   6 +-
 .../plan/planner/OperatorTreeGenerator.java        | 248 +++++++------
 .../planner/distribution/ExchangeNodeAdder.java    |   6 +
 .../plan/planner/plan/node/PlanNodeType.java       |   5 +
 .../plan/planner/plan/node/PlanVisitor.java        |   5 +
 .../plan/node/process/RawDataAggregationNode.java  | 322 +++++++++++++++++
 9 files changed, 828 insertions(+), 645 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
new file mode 100644
index 00000000000..77d13471a95
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.optimization;
+
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanSourceNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.schemaengine.schemaregion.utils.MetaUtils;
+import org.apache.iotdb.db.utils.SchemaUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
+
+public class AggregationPushDown implements PlanOptimizer {
+
+  @Override
+  public PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext context) {
+    if (analysis.getStatement().getType() != StatementType.QUERY) {
+      return plan;
+    }
+    QueryStatement queryStatement = analysis.getQueryStatement();
+    if (!queryStatement.isAggregationQuery()
+        || (queryStatement.isGroupBy() && !queryStatement.isGroupByTime())
+        || cannotUseStatistics(queryStatement, analysis)) {
+      return plan;
+    }
+    return plan.accept(
+        new Rewriter(), new RewriterContext(analysis, context, queryStatement.isAlignByDevice()));
+  }
+
+  private boolean cannotUseStatistics(QueryStatement queryStatement, Analysis analysis) {
+    boolean isAlignByDevice = queryStatement.isAlignByDevice();
+    if (isAlignByDevice) {
+      // check any of the devices
+      String device = analysis.getDeviceList().get(0).getDevice();
+      return cannotUseStatistics(
+          analysis.getDeviceToAggregationExpressions().get(device),
+          analysis.getDeviceToSourceTransformExpressions().get(device));
+    } else {
+      return cannotUseStatistics(
+          analysis.getAggregationExpressions(), analysis.getSourceTransformExpressions());
+    }
+  }
+
+  private boolean cannotUseStatistics(
+      Set<Expression> aggregationExpressions, Set<Expression> sourceTransformExpressions) {
+    for (Expression expression : aggregationExpressions) {
+
+      if (expression instanceof FunctionExpression) {
+        FunctionExpression functionExpression = (FunctionExpression) expression;
+        // Disable statistics optimization of UDAF for now
+        if (functionExpression.isExternalAggregationFunctionExpression()) {
+          return true;
+        }
+
+        if (COUNT_TIME.equalsIgnoreCase(functionExpression.getFunctionName())) {
+          String alignedDeviceId = "";
+          for (Expression countTimeExpression : sourceTransformExpressions) {
+            TimeSeriesOperand ts = (TimeSeriesOperand) countTimeExpression;
+            if (!(ts.getPath() instanceof AlignedPath
+                || ((MeasurementPath) ts.getPath()).isUnderAlignedEntity())) {
+              return true;
+            }
+            if (StringUtils.isEmpty(alignedDeviceId)) {
+              alignedDeviceId = ts.getPath().getDevice();
+            } else if (!alignedDeviceId.equalsIgnoreCase(ts.getPath().getDevice())) {
+              // count_time from only one aligned device can use AlignedSeriesAggScan
+              return true;
+            }
+          }
+          return false;
+        }
+
+        if (!BuiltinAggregationFunction.canUseStatistics(functionExpression.getFunctionName())) {
+          return true;
+        }
+      } else {
+        throw new IllegalArgumentException(
+            String.format("Invalid Aggregation Expression: %s", expression.getExpressionString()));
+      }
+    }
+    return false;
+  }
+
+  private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> {
+
+    @Override
+    public PlanNode visitPlan(PlanNode node, RewriterContext context) {
+      throw new IllegalArgumentException("Unexpected plan node: " + node);
+    }
+
+    @Override
+    public PlanNode visitSingleChildProcess(SingleChildProcessNode node, RewriterContext context) {
+      PlanNode rewrittenChild = node.getChild().accept(this, context);
+      node.setChild(rewrittenChild);
+      return node;
+    }
+
+    @Override
+    public PlanNode visitMultiChildProcess(MultiChildProcessNode node, RewriterContext context) {
+      List<PlanNode> rewrittenChildren = new ArrayList<>();
+      for (PlanNode child : node.getChildren()) {
+        rewrittenChildren.add(child.accept(this, context));
+      }
+      node.setChildren(rewrittenChildren);
+      return node;
+    }
+
+    @Override
+    public PlanNode visitRawDataAggregation(RawDataAggregationNode node, RewriterContext context) {
+      PlanNode child = node.getChild();
+      if (child instanceof FullOuterTimeJoinNode || child instanceof SeriesScanSourceNode) {
+        if (child instanceof SeriesScanSourceNode) {
+          // only one source, check partition
+        }
+
+        List<AggregationDescriptor> aggregationDescriptorList = node.getAggregationDescriptorList();
+
+        boolean needCheckAscending = node.getGroupByTimeParameter() == null;
+        Map<PartialPath, List<AggregationDescriptor>> sourceToAscendingAggregationsMap =
+            new HashMap<>();
+        Map<PartialPath, List<AggregationDescriptor>> sourceToDescendingAggregationsMap =
+            new HashMap<>();
+        Map<PartialPath, List<AggregationDescriptor>> sourceToCountTimeAggregationsMap =
+            new HashMap<>();
+        for (AggregationDescriptor aggregationDescriptor : aggregationDescriptorList) {
+          checkArgument(
+              aggregationDescriptor.getInputExpressions().size() == 1
+                  && aggregationDescriptor.getInputExpressions().get(0)
+                      instanceof TimeSeriesOperand);
+          PartialPath path =
+              ((TimeSeriesOperand) aggregationDescriptor.getInputExpressions().get(0)).getPath();
+          if (aggregationDescriptor.getAggregationType().equals(TAggregationType.COUNT_TIME)) {
+            sourceToCountTimeAggregationsMap
+                .computeIfAbsent(path, key -> new ArrayList<>())
+                .add(aggregationDescriptor);
+          } else if (SchemaUtils.isConsistentWithScanOrder(
+              aggregationDescriptor.getAggregationType(), node.getScanOrder())) {
+            sourceToAscendingAggregationsMap
+                .computeIfAbsent(path, key -> new ArrayList<>())
+                .add(aggregationDescriptor);
+          } else {
+            sourceToDescendingAggregationsMap
+                .computeIfAbsent(path, key -> new ArrayList<>())
+                .add(aggregationDescriptor);
+          }
+        }
+
+        List<PlanNode> sourceNodeList = new ArrayList<>();
+        Map<PartialPath, List<AggregationDescriptor>> groupedSourceToAscendingAggregations;
+        if (!sourceToCountTimeAggregationsMap.isEmpty()) {
+          groupedSourceToAscendingAggregations = sourceToCountTimeAggregationsMap;
+        } else {
+          groupedSourceToAscendingAggregations =
+              MetaUtils.groupAlignedAggregations(sourceToAscendingAggregationsMap);
+        }
+        for (Map.Entry<PartialPath, List<AggregationDescriptor>> sourceAggregationsEntry :
+            groupedSourceToAscendingAggregations.entrySet()) {
+          sourceNodeList.add(
+              createAggregationScanNode(
+                  sourceAggregationsEntry.getKey(),
+                  sourceAggregationsEntry.getValue(),
+                  node.getScanOrder(),
+                  node.getGroupByTimeParameter(),
+                  context));
+        }
+        if (needCheckAscending) {
+          Map<PartialPath, List<AggregationDescriptor>> groupedSourceToDescendingAggregations =
+              MetaUtils.groupAlignedAggregations(sourceToDescendingAggregationsMap);
+          for (Map.Entry<PartialPath, List<AggregationDescriptor>> sourceAggregationsEntry :
+              groupedSourceToDescendingAggregations.entrySet()) {
+            sourceNodeList.add(
+                createAggregationScanNode(
+                    sourceAggregationsEntry.getKey(),
+                    sourceAggregationsEntry.getValue(),
+                    node.getScanOrder().reverse(),
+                    null,
+                    context));
+          }
+        }
+
+        PlanNode resultNode = convergeWithTimeJoin(sourceNodeList, node.getScanOrder(), context);
+        resultNode = planProject(resultNode, node.getChild(), context);
+        return resultNode;
+      }
+      // cannot push down
+      return node;
+    }
+
+    private SeriesAggregationSourceNode createAggregationScanNode(
+        PartialPath selectPath,
+        List<AggregationDescriptor> aggregationDescriptorList,
+        Ordering scanOrder,
+        GroupByTimeParameter groupByTimeParameter,
+        RewriterContext context) {
+      if (selectPath instanceof MeasurementPath) { // non-aligned series
+        return new SeriesAggregationScanNode(
+            context.genPlanNodeId(),
+            (MeasurementPath) selectPath,
+            aggregationDescriptorList,
+            scanOrder,
+            groupByTimeParameter);
+      } else if (selectPath instanceof AlignedPath) { // aligned series
+        return new AlignedSeriesAggregationScanNode(
+            context.genPlanNodeId(),
+            (AlignedPath) selectPath,
+            aggregationDescriptorList,
+            scanOrder,
+            groupByTimeParameter);
+      } else {
+        throw new IllegalArgumentException("unexpected path type");
+      }
+    }
+
+    private PlanNode convergeWithTimeJoin(
+        List<PlanNode> sourceNodes, Ordering mergeOrder, RewriterContext context) {
+      PlanNode tmpNode;
+      if (sourceNodes.size() == 1) {
+        tmpNode = sourceNodes.get(0);
+      } else {
+        tmpNode = new FullOuterTimeJoinNode(context.genPlanNodeId(), mergeOrder, sourceNodes);
+      }
+      return tmpNode;
+    }
+
+    private PlanNode planProject(PlanNode resultNode, PlanNode rawNode, RewriterContext context) {
+      if (context.isAlignByDevice()
+          && !rawNode.getOutputColumnNames().equals(resultNode.getOutputColumnNames())) {
+        return new ProjectNode(context.genPlanNodeId(), resultNode, rawNode.getOutputColumnNames());
+      }
+      return resultNode;
+    }
+  }
+
+  private static class RewriterContext {
+
+    private final QueryId queryId;
+    private final boolean isAlignByDevice;
+
+    public RewriterContext(Analysis analysis, MPPQueryContext context, boolean isAlignByDevice) {
+      this.queryId = context.getQueryId();
+      this.isAlignByDevice = isAlignByDevice;
+    }
+
+    public PlanNodeId genPlanNodeId() {
+      return queryId.genPlanNodeId();
+    }
+
+    public boolean isAlignByDevice() {
+      return isAlignByDevice;
+    }
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 15b890cb2ec..1f4dbb34b07 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TAggregationType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
 import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -56,19 +55,18 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewIntoNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.IntoNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
@@ -78,11 +76,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullO
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
@@ -103,8 +98,6 @@ import org.apache.iotdb.db.schemaengine.template.Template;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.db.utils.columngenerator.parameter.SlidingTimeColumnGeneratorParameter;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 import org.apache.commons.lang3.Validate;
 
@@ -113,7 +106,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -123,14 +115,12 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.DEVICE;
 import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
 import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpression;
 import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode.LIMIT_VALUE_USE_TOP_K;
 import static org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy.LINEAR;
-import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
 import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_VALUE;
 import static org.apache.iotdb.db.utils.constant.SqlConstant.MAX_TIME;
 
@@ -378,7 +368,7 @@ public class LogicalPlanBuilder {
               .planRawDataSource(
                   sourceExpressions, Ordering.DESC, 0, 0, analysis.isLastLevelUseWildcard())
               .planWhereAndSourceTransform(null, sourceTransformExpressions, false, Ordering.DESC)
-              .planAggregation(
+              .planRawDataAggregation(
                   new LinkedHashSet<>(Arrays.asList(maxTimeAgg, lastValueAgg)),
                   null,
                   analysis.getGroupByTimeParameter(),
@@ -397,304 +387,6 @@ public class LogicalPlanBuilder {
     }
   }
 
-  public LogicalPlanBuilder planAggregationSource(
-      AggregationStep curStep,
-      Ordering scanOrder,
-      GroupByTimeParameter groupByTimeParameter,
-      Set<Expression> aggregationExpressions,
-      Set<Expression> sourceTransformExpressions,
-      Map<Expression, Set<Expression>> crossGroupByAggregations,
-      List<String> tagKeys,
-      Map<List<String>, LinkedHashMap<Expression, List<Expression>>>
-          tagValuesToGroupedTimeseriesOperands) {
-    boolean needCheckAscending = groupByTimeParameter == null;
-    Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>();
-    Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>();
-    Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations = new HashMap<>();
-    for (Expression aggregationExpression : aggregationExpressions) {
-      createAggregationDescriptor(
-          (FunctionExpression) aggregationExpression,
-          curStep,
-          scanOrder,
-          needCheckAscending,
-          ascendingAggregations,
-          descendingAggregations,
-          countTimeAggregations);
-    }
-
-    List<PlanNode> sourceNodeList =
-        constructSourceNodeFromAggregationDescriptors(
-            ascendingAggregations,
-            descendingAggregations,
-            countTimeAggregations,
-            scanOrder,
-            groupByTimeParameter);
-    updateTypeProvider(aggregationExpressions);
-    updateTypeProvider(sourceTransformExpressions);
-
-    return convergeAggregationSource(
-        sourceNodeList,
-        curStep,
-        scanOrder,
-        groupByTimeParameter,
-        aggregationExpressions,
-        crossGroupByAggregations,
-        tagKeys,
-        tagValuesToGroupedTimeseriesOperands);
-  }
-
-  public LogicalPlanBuilder planAggregationSourceWithIndexAdjust(
-      AggregationStep curStep,
-      Ordering scanOrder,
-      GroupByTimeParameter groupByTimeParameter,
-      Set<Expression> aggregationExpressions,
-      Set<Expression> sourceTransformExpressions,
-      Map<Expression, Set<Expression>> crossGroupByExpressions,
-      List<Integer> deviceViewInputIndexes,
-      boolean outputEndTime) {
-    checkArgument(
-        aggregationExpressions.size() <= deviceViewInputIndexes.size(),
-        "Each aggregate should correspond to a column of output.");
-
-    boolean needCheckAscending = groupByTimeParameter == null;
-    Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>();
-    Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>();
-    Map<AggregationDescriptor, Integer> aggregationToIndexMap = new HashMap<>();
-    Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations = new HashMap<>();
-
-    // If need output endTime, the first index is used by __endTime
-    int index = outputEndTime ? 1 : 0;
-    for (Expression aggregationExpression : aggregationExpressions) {
-      AggregationDescriptor aggregationDescriptor =
-          createAggregationDescriptor(
-              (FunctionExpression) aggregationExpression,
-              curStep,
-              scanOrder,
-              needCheckAscending,
-              ascendingAggregations,
-              descendingAggregations,
-              countTimeAggregations);
-      aggregationToIndexMap.put(aggregationDescriptor, deviceViewInputIndexes.get(index));
-      index++;
-    }
-
-    List<PlanNode> sourceNodeList =
-        constructSourceNodeFromAggregationDescriptors(
-            ascendingAggregations,
-            descendingAggregations,
-            countTimeAggregations,
-            scanOrder,
-            groupByTimeParameter);
-    updateTypeProvider(aggregationExpressions);
-    updateTypeProvider(sourceTransformExpressions);
-
-    if (!curStep.isOutputPartial()) {
-      // update measurementIndexes
-      deviceViewInputIndexes.clear();
-      if (outputEndTime) {
-        deviceViewInputIndexes.add(1);
-      }
-      deviceViewInputIndexes.addAll(
-          sourceNodeList.stream()
-              .map(
-                  planNode ->
-                      ((SeriesAggregationSourceNode) planNode).getAggregationDescriptorList())
-              .flatMap(List::stream)
-              .map(aggregationToIndexMap::get)
-              .collect(Collectors.toList()));
-    }
-
-    return convergeAggregationSource(
-        sourceNodeList,
-        curStep,
-        scanOrder,
-        groupByTimeParameter,
-        aggregationExpressions,
-        crossGroupByExpressions,
-        null,
-        null);
-  }
-
-  private AggregationDescriptor createAggregationDescriptor(
-      FunctionExpression sourceExpression,
-      AggregationStep curStep,
-      Ordering scanOrder,
-      boolean needCheckAscending,
-      Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
-      Map<PartialPath, List<AggregationDescriptor>> descendingAggregations,
-      Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations) {
-    AggregationDescriptor aggregationDescriptor =
-        new AggregationDescriptor(
-            sourceExpression.getFunctionName(),
-            curStep,
-            sourceExpression.getExpressions(),
-            sourceExpression.getFunctionAttributes());
-    if (curStep.isOutputPartial()) {
-      updateTypeProviderByPartialAggregation(aggregationDescriptor, context.getTypeProvider());
-    }
-
-    if (COUNT_TIME.equalsIgnoreCase(sourceExpression.getFunctionName())) {
-      Map<String, Pair<List<String>, List<IMeasurementSchema>>> map = new HashMap<>();
-      for (Expression expression : sourceExpression.getCountTimeExpressions()) {
-        TimeSeriesOperand ts = (TimeSeriesOperand) expression;
-        PartialPath path = ts.getPath();
-        Pair<List<String>, List<IMeasurementSchema>> pair =
-            map.computeIfAbsent(
-                path.getDevice(), k -> new Pair<>(new ArrayList<>(), new ArrayList<>()));
-        pair.left.add(path.getMeasurement());
-        try {
-          pair.right.add(path.getMeasurementSchema());
-        } catch (MetadataException ex) {
-          throw new RuntimeException(ex);
-        }
-      }
-
-      for (Map.Entry<String, Pair<List<String>, List<IMeasurementSchema>>> entry : map.entrySet()) {
-        String device = entry.getKey();
-        Pair<List<String>, List<IMeasurementSchema>> pair = entry.getValue();
-        AlignedPath alignedPath = null;
-        try {
-          alignedPath = new AlignedPath(device, pair.left, pair.right);
-        } catch (IllegalPathException e) {
-          throw new RuntimeException(e);
-        }
-        countTimeAggregations.put(alignedPath, Collections.singletonList(aggregationDescriptor));
-      }
-
-      return aggregationDescriptor;
-    }
-
-    PartialPath selectPath =
-        ((TimeSeriesOperand) sourceExpression.getExpressions().get(0)).getPath();
-    if (!needCheckAscending
-        || SchemaUtils.isConsistentWithScanOrder(
-            aggregationDescriptor.getAggregationType(), scanOrder)) {
-      ascendingAggregations
-          .computeIfAbsent(selectPath, key -> new ArrayList<>())
-          .add(aggregationDescriptor);
-    } else {
-      descendingAggregations
-          .computeIfAbsent(selectPath, key -> new ArrayList<>())
-          .add(aggregationDescriptor);
-    }
-    return aggregationDescriptor;
-  }
-
-  private List<PlanNode> constructSourceNodeFromAggregationDescriptors(
-      Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
-      Map<PartialPath, List<AggregationDescriptor>> descendingAggregations,
-      Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations,
-      Ordering scanOrder,
-      GroupByTimeParameter groupByTimeParameter) {
-
-    List<PlanNode> sourceNodeList = new ArrayList<>();
-    boolean needCheckAscending = groupByTimeParameter == null;
-    Map<PartialPath, List<AggregationDescriptor>> groupedAscendingAggregations = null;
-    if (!countTimeAggregations.isEmpty()) {
-      groupedAscendingAggregations = countTimeAggregations;
-    } else {
-      groupedAscendingAggregations = MetaUtils.groupAlignedAggregations(ascendingAggregations);
-    }
-
-    for (Map.Entry<PartialPath, List<AggregationDescriptor>> pathAggregationsEntry :
-        groupedAscendingAggregations.entrySet()) {
-      sourceNodeList.add(
-          createAggregationScanNode(
-              pathAggregationsEntry.getKey(),
-              pathAggregationsEntry.getValue(),
-              scanOrder,
-              groupByTimeParameter));
-    }
-
-    if (needCheckAscending) {
-      Map<PartialPath, List<AggregationDescriptor>> groupedDescendingAggregations =
-          MetaUtils.groupAlignedAggregations(descendingAggregations);
-      for (Map.Entry<PartialPath, List<AggregationDescriptor>> pathAggregationsEntry :
-          groupedDescendingAggregations.entrySet()) {
-        sourceNodeList.add(
-            createAggregationScanNode(
-                pathAggregationsEntry.getKey(),
-                pathAggregationsEntry.getValue(),
-                scanOrder.reverse(),
-                null));
-      }
-    }
-    return sourceNodeList;
-  }
-
-  private LogicalPlanBuilder convergeAggregationSource(
-      List<PlanNode> sourceNodeList,
-      AggregationStep curStep,
-      Ordering scanOrder,
-      GroupByTimeParameter groupByTimeParameter,
-      Set<Expression> aggregationExpressions,
-      Map<Expression, Set<Expression>> crossGroupByExpressions,
-      List<String> tagKeys,
-      Map<List<String>, LinkedHashMap<Expression, List<Expression>>>
-          tagValuesToGroupedTimeseriesOperands) {
-    if (curStep.isOutputPartial()) {
-      if (groupByTimeParameter != null && groupByTimeParameter.hasOverlap()) {
-        curStep =
-            crossGroupByExpressions != null ? AggregationStep.INTERMEDIATE : AggregationStep.FINAL;
-
-        this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
-
-        this.root =
-            createSlidingWindowAggregationNode(
-                this.getRoot(), aggregationExpressions, groupByTimeParameter, curStep, scanOrder);
-
-        if (crossGroupByExpressions != null) {
-          curStep = AggregationStep.FINAL;
-          if (tagKeys != null) {
-            this.root =
-                createGroupByTagNode(
-                    tagKeys,
-                    tagValuesToGroupedTimeseriesOperands,
-                    crossGroupByExpressions.keySet(),
-                    Collections.singletonList(this.getRoot()),
-                    curStep,
-                    groupByTimeParameter,
-                    scanOrder);
-          } else {
-            this.root =
-                createGroupByTLevelNode(
-                    Collections.singletonList(this.getRoot()),
-                    crossGroupByExpressions,
-                    curStep,
-                    groupByTimeParameter,
-                    scanOrder);
-          }
-        }
-      } else {
-        if (tagKeys != null) {
-          curStep = AggregationStep.FINAL;
-          this.root =
-              createGroupByTagNode(
-                  tagKeys,
-                  tagValuesToGroupedTimeseriesOperands,
-                  crossGroupByExpressions.keySet(),
-                  sourceNodeList,
-                  curStep,
-                  groupByTimeParameter,
-                  scanOrder);
-        } else if (crossGroupByExpressions != null) {
-          curStep = AggregationStep.FINAL;
-          this.root =
-              createGroupByTLevelNode(
-                  sourceNodeList,
-                  crossGroupByExpressions,
-                  curStep,
-                  groupByTimeParameter,
-                  scanOrder);
-        }
-      }
-    } else {
-      this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
-    }
-
-    return this;
-  }
-
   public static void updateTypeProviderByPartialAggregation(
       AggregationDescriptor aggregationDescriptor, TypeProvider typeProvider) {
     if (aggregationDescriptor.getAggregationType() == TAggregationType.UDAF) {
@@ -977,7 +669,7 @@ public class LogicalPlanBuilder {
     return this;
   }
 
-  public LogicalPlanBuilder planAggregation(
+  public LogicalPlanBuilder planRawDataAggregation(
       Set<Expression> aggregationExpressions,
       Expression groupByExpression,
       GroupByTimeParameter groupByTimeParameter,
@@ -999,9 +691,9 @@ public class LogicalPlanBuilder {
                   aggregationDescriptor, context.getTypeProvider()));
     }
     this.root =
-        new AggregationNode(
+        new RawDataAggregationNode(
             context.getQueryId().genPlanNodeId(),
-            Collections.singletonList(this.getRoot()),
+            this.getRoot(),
             aggregationDescriptorList,
             groupByTimeParameter,
             groupByParameter,
@@ -1076,85 +768,6 @@ public class LogicalPlanBuilder {
         scanOrder);
   }
 
-  private PlanNode createGroupByTagNode(
-      List<String> tagKeys,
-      Map<List<String>, LinkedHashMap<Expression, List<Expression>>>
-          tagValuesToGroupedTimeseriesOperands,
-      Collection<Expression> groupByTagOutputExpressions,
-      List<PlanNode> children,
-      AggregationStep curStep,
-      GroupByTimeParameter groupByTimeParameter,
-      Ordering scanOrder) {
-    Map<List<String>, List<CrossSeriesAggregationDescriptor>> tagValuesToAggregationDescriptors =
-        new HashMap<>();
-    for (List<String> tagValues : tagValuesToGroupedTimeseriesOperands.keySet()) {
-      LinkedHashMap<Expression, List<Expression>> groupedTimeseriesOperands =
-          tagValuesToGroupedTimeseriesOperands.get(tagValues);
-      List<CrossSeriesAggregationDescriptor> aggregationDescriptors = new ArrayList<>();
-
-      // Bind an AggregationDescriptor for each GroupByTagOutputExpression
-      for (Expression groupByTagOutputExpression : groupByTagOutputExpressions) {
-        boolean added = false;
-        for (Expression expression : groupedTimeseriesOperands.keySet()) {
-          if (expression.equals(groupByTagOutputExpression)) {
-            String functionName = ((FunctionExpression) expression).getFunctionName();
-            CrossSeriesAggregationDescriptor aggregationDescriptor =
-                new CrossSeriesAggregationDescriptor(
-                    functionName,
-                    curStep,
-                    groupedTimeseriesOperands.get(expression),
-                    ((FunctionExpression) expression).getFunctionAttributes(),
-                    expression.getExpressions());
-            aggregationDescriptors.add(aggregationDescriptor);
-            added = true;
-            break;
-          }
-        }
-        if (!added) {
-          aggregationDescriptors.add(null);
-        }
-      }
-      tagValuesToAggregationDescriptors.put(tagValues, aggregationDescriptors);
-    }
-
-    updateTypeProvider(groupByTagOutputExpressions);
-    updateTypeProviderWithConstantType(tagKeys, TSDataType.TEXT);
-    return new GroupByTagNode(
-        context.getQueryId().genPlanNodeId(),
-        children,
-        groupByTimeParameter,
-        scanOrder,
-        tagKeys,
-        tagValuesToAggregationDescriptors,
-        groupByTagOutputExpressions.stream()
-            .map(Expression::getExpressionString)
-            .collect(Collectors.toList()));
-  }
-
-  private SeriesAggregationSourceNode createAggregationScanNode(
-      PartialPath selectPath,
-      List<AggregationDescriptor> aggregationDescriptorList,
-      Ordering scanOrder,
-      GroupByTimeParameter groupByTimeParameter) {
-    if (selectPath instanceof MeasurementPath) { // non-aligned series
-      return new SeriesAggregationScanNode(
-          context.getQueryId().genPlanNodeId(),
-          (MeasurementPath) selectPath,
-          aggregationDescriptorList,
-          scanOrder,
-          groupByTimeParameter);
-    } else if (selectPath instanceof AlignedPath) { // aligned series
-      return new AlignedSeriesAggregationScanNode(
-          context.getQueryId().genPlanNodeId(),
-          (AlignedPath) selectPath,
-          aggregationDescriptorList,
-          scanOrder,
-          groupByTimeParameter);
-    } else {
-      throw new IllegalArgumentException("unexpected path type");
-    }
-  }
-
   private List<AggregationDescriptor> constructAggregationDescriptorList(
       Set<Expression> aggregationExpressions, AggregationStep curStep) {
     return aggregationExpressions.stream()
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 87c04426010..2c371732f4f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -18,18 +18,12 @@
  */
 package org.apache.iotdb.db.queryengine.plan.planner;
 
-import org.apache.iotdb.commons.path.AlignedPath;
-import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
-import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
-import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
-import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
-import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
 import org.apache.iotdb.db.queryengine.plan.expression.visitor.TransformToViewExpressionVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.ExplainAnalyzeNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -95,8 +89,6 @@ import org.apache.iotdb.db.schemaengine.template.Template;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-import org.apache.commons.lang3.StringUtils;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -105,7 +97,6 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
-import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
 
 /**
  * This visitor is used to generate a logical plan for the statement and returns the {@link
@@ -179,7 +170,6 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
                     analysis.getDeviceToGroupByExpression() != null
                         ? analysis.getDeviceToGroupByExpression().get(deviceName)
                         : null,
-                    analysis.getDeviceViewInputIndexesMap().get(deviceName),
                     context));
         // order by device, expression, push down sortOperator
         if (queryStatement.needPushDownSort()) {
@@ -210,7 +200,6 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
                   analysis.getWhereExpression(),
                   analysis.getAggregationExpressions(),
                   analysis.getGroupByExpression(),
-                  null,
                   context));
     }
 
@@ -255,7 +244,6 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
       Expression whereExpression,
       Set<Expression> aggregationExpressions,
       Expression groupByExpression,
-      List<Integer> deviceViewInputIndexes,
       MPPQueryContext context) {
     LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
     if (aggregationExpressions == null) {
@@ -275,96 +263,56 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
                   queryStatement.getResultTimeOrder());
     } else {
       // aggregation query
-      boolean isRawDataSource =
-          analysis.hasValueFilter()
-              || analysis.hasGroupByParameter()
-              || needTransform(sourceTransformExpressions)
-              || cannotUseStatistics(aggregationExpressions, sourceTransformExpressions);
-      if (queryStatement.isOutputEndTime()) {
-        context.getTypeProvider().setType(ENDTIME, TSDataType.INT64);
-      }
-      AggregationStep curStep;
-      if (isRawDataSource) {
-        planBuilder =
-            planBuilder
-                .planRawDataSource(
-                    sourceExpressions,
-                    queryStatement.getResultTimeOrder(),
-                    0,
-                    0,
-                    analysis.isLastLevelUseWildcard())
-                .planWhereAndSourceTransform(
-                    whereExpression,
-                    sourceTransformExpressions,
-                    queryStatement.isGroupByTime(),
-                    queryStatement.getResultTimeOrder());
-
-        boolean outputPartial =
-            queryStatement.isGroupByLevel()
-                || (queryStatement.isGroupByTime()
-                    && analysis.getGroupByTimeParameter().hasOverlap());
-        curStep = outputPartial ? AggregationStep.PARTIAL : AggregationStep.SINGLE;
+      planBuilder =
+          planBuilder
+              .planRawDataSource(
+                  sourceExpressions,
+                  queryStatement.getResultTimeOrder(),
+                  0,
+                  0,
+                  analysis.isLastLevelUseWildcard())
+              .planWhereAndSourceTransform(
+                  whereExpression,
+                  sourceTransformExpressions,
+                  queryStatement.isGroupByTime(),
+                  queryStatement.getResultTimeOrder());
+
+      boolean outputPartial =
+          queryStatement.isGroupByLevel()
+              || (queryStatement.isGroupByTime()
+                  && analysis.getGroupByTimeParameter().hasOverlap());
+      AggregationStep curStep = outputPartial ? AggregationStep.PARTIAL : AggregationStep.SINGLE;
+      planBuilder =
+          planBuilder.planRawDataAggregation(
+              aggregationExpressions,
+              groupByExpression,
+              analysis.getGroupByTimeParameter(),
+              analysis.getGroupByParameter(),
+              queryStatement.isOutputEndTime(),
+              curStep,
+              queryStatement.getResultTimeOrder());
+
+      if (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()) {
+        curStep =
+            queryStatement.isGroupByLevel() ? AggregationStep.INTERMEDIATE : AggregationStep.FINAL;
         planBuilder =
-            planBuilder.planAggregation(
+            planBuilder.planSlidingWindowAggregation(
                 aggregationExpressions,
-                groupByExpression,
                 analysis.getGroupByTimeParameter(),
-                analysis.getGroupByParameter(),
-                queryStatement.isOutputEndTime(),
                 curStep,
                 queryStatement.getResultTimeOrder());
+      }
 
-        if (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()) {
-          curStep =
-              queryStatement.isGroupByLevel()
-                  ? AggregationStep.INTERMEDIATE
-                  : AggregationStep.FINAL;
-          planBuilder =
-              planBuilder.planSlidingWindowAggregation(
-                  aggregationExpressions,
-                  analysis.getGroupByTimeParameter(),
-                  curStep,
-                  queryStatement.getResultTimeOrder());
-        }
-
-        if (queryStatement.isGroupByLevel()) {
-          planBuilder =
-              planBuilder.planGroupByLevel(
-                  analysis.getCrossGroupByExpressions(),
-                  analysis.getGroupByTimeParameter(),
-                  queryStatement.getResultTimeOrder());
-        }
-      } else {
-        curStep =
-            (analysis.getCrossGroupByExpressions() != null
-                    || (analysis.getGroupByTimeParameter() != null
-                        && analysis.getGroupByTimeParameter().hasOverlap()))
-                ? AggregationStep.PARTIAL
-                : AggregationStep.SINGLE;
-
+      if (queryStatement.isGroupByLevel()) {
         planBuilder =
-            deviceViewInputIndexes == null
-                ? planBuilder.planAggregationSource(
-                    curStep,
-                    queryStatement.getResultTimeOrder(),
-                    analysis.getGroupByTimeParameter(),
-                    aggregationExpressions,
-                    sourceTransformExpressions,
-                    analysis.getCrossGroupByExpressions(),
-                    analysis.getTagKeys(),
-                    analysis.getTagValuesToGroupedTimeseriesOperands())
-                : planBuilder.planAggregationSourceWithIndexAdjust(
-                    curStep,
-                    queryStatement.getResultTimeOrder(),
-                    analysis.getGroupByTimeParameter(),
-                    aggregationExpressions,
-                    sourceTransformExpressions,
-                    analysis.getCrossGroupByExpressions(),
-                    deviceViewInputIndexes,
-                    queryStatement.isOutputEndTime());
+            planBuilder.planGroupByLevel(
+                analysis.getCrossGroupByExpressions(),
+                analysis.getGroupByTimeParameter(),
+                queryStatement.getResultTimeOrder());
       }
 
       if (queryStatement.isGroupByTime() && queryStatement.isOutputEndTime()) {
+        context.getTypeProvider().setType(ENDTIME, TSDataType.INT64);
         planBuilder =
             planBuilder.planEndTimeColumnInject(
                 analysis.getGroupByTimeParameter(),
@@ -395,55 +343,6 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
     return 0;
   }
 
-  private boolean needTransform(Set<Expression> expressions) {
-    for (Expression expression : expressions) {
-      if (ExpressionAnalyzer.checkIsNeedTransform(expression)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean cannotUseStatistics(
-      Set<Expression> expressions, Set<Expression> sourceTransformExpressions) {
-    for (Expression expression : expressions) {
-
-      if (expression instanceof FunctionExpression) {
-        FunctionExpression functionExpression = (FunctionExpression) expression;
-        // Disable statistics optimization of UDAF for now
-        if (functionExpression.isExternalAggregationFunctionExpression()) {
-          return true;
-        }
-
-        if (COUNT_TIME.equalsIgnoreCase(functionExpression.getFunctionName())) {
-          String alignedDeviceId = "";
-          for (Expression countTimeExpression : sourceTransformExpressions) {
-            TimeSeriesOperand ts = (TimeSeriesOperand) countTimeExpression;
-            if (!(ts.getPath() instanceof AlignedPath
-                || ((MeasurementPath) ts.getPath()).isUnderAlignedEntity())) {
-              return true;
-            }
-            if (StringUtils.isEmpty(alignedDeviceId)) {
-              alignedDeviceId = ts.getPath().getDevice();
-            } else if (!alignedDeviceId.equalsIgnoreCase(ts.getPath().getDevice())) {
-              // count_time from only one aligned device can use AlignedSeriesAggScan
-              return true;
-            }
-          }
-          return false;
-        }
-
-        if (!BuiltinAggregationFunction.canUseStatistics(functionExpression.getFunctionName())) {
-          return true;
-        }
-      } else {
-        throw new IllegalArgumentException(
-            String.format("Invalid Aggregation Expression: %s", expression.getExpressionString()));
-      }
-    }
-    return false;
-  }
-
   @Override
   public PlanNode visitCreateTimeseries(
       CreateTimeSeriesStatement createTimeSeriesStatement, MPPQueryContext context) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanner.java
index 78d939f019f..5f460f0267d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanner.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanner.java
@@ -21,12 +21,13 @@ package org.apache.iotdb.db.queryengine.plan.planner;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.optimization.AggregationPushDown;
 import org.apache.iotdb.db.queryengine.plan.optimization.PlanOptimizer;
 import org.apache.iotdb.db.queryengine.plan.optimization.PredicatePushDown;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.List;
 
 import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.LOGICAL_PLANNER;
@@ -35,7 +36,8 @@ import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.LOGI
 public class LogicalPlanner {
 
   private final MPPQueryContext context;
-  private final List<PlanOptimizer> optimizers = Collections.singletonList(new PredicatePushDown());
+  private final List<PlanOptimizer> optimizers =
+      Arrays.asList(new PredicatePushDown(), new AggregationPushDown());
 
   public LogicalPlanner(MPPQueryContext context) {
     this.context = context;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index a6d043f5ad2..9517179c98a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -192,6 +192,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortN
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
@@ -1753,11 +1754,12 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
   }
 
   @Override
-  public Operator visitAggregation(AggregationNode node, LocalExecutionPlanContext context) {
+  public Operator visitRawDataAggregation(
+      RawDataAggregationNode node, LocalExecutionPlanContext context) {
     checkArgument(
         !node.getAggregationDescriptorList().isEmpty(),
         "Aggregation descriptorList cannot be empty");
-    List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
+    Operator child = node.getChild().accept(this, context);
     boolean ascending = node.getScanOrder() == Ordering.ASC;
     List<Aggregator> aggregators = new ArrayList<>();
     Map<String, List<InputLocation>> layout = makeLayout(node);
@@ -1779,128 +1781,154 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
               descriptor.getStep(),
               inputLocationList));
     }
-    boolean inputRaw = node.getAggregationDescriptorList().get(0).getStep().isInputRaw();
+
     GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
     GroupByParameter groupByParameter = node.getGroupByParameter();
 
-    if (inputRaw) {
-      checkArgument(children.size() == 1, "rawDataAggregateOperator can only accept one input");
-      OperatorContext operatorContext =
-          context
-              .getDriverContext()
-              .addOperatorContext(
-                  context.getNextOperatorId(),
-                  node.getPlanNodeId(),
-                  RawDataAggregationOperator.class.getSimpleName());
-
-      ITimeRangeIterator timeRangeIterator =
-          initTimeRangeIterator(groupByTimeParameter, ascending, true);
-      long maxReturnSize =
-          calculateMaxAggregationResultSize(
-              aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
-
-      // groupByParameter and groupByTimeParameter
-      if (groupByParameter != null) {
-        WindowType windowType = groupByParameter.getWindowType();
-
-        WindowParameter windowParameter;
-        switch (windowType) {
-          case VARIATION_WINDOW:
-            Expression groupByVariationExpression = node.getGroupByExpression();
-            if (groupByVariationExpression == null) {
-              throw new IllegalArgumentException("groupByVariationExpression can't be null");
-            }
-            String controlColumn = groupByVariationExpression.getExpressionString();
-            TSDataType controlColumnType = context.getTypeProvider().getType(controlColumn);
-            windowParameter =
-                new VariationWindowParameter(
-                    controlColumnType,
-                    layout.get(controlColumn).get(0).getValueColumnIndex(),
-                    node.isOutputEndTime(),
-                    ((GroupByVariationParameter) groupByParameter).isIgnoringNull(),
-                    ((GroupByVariationParameter) groupByParameter).getDelta());
-            break;
-          case CONDITION_WINDOW:
-            Expression groupByConditionExpression = node.getGroupByExpression();
-            if (groupByConditionExpression == null) {
-              throw new IllegalArgumentException("groupByConditionExpression can't be null");
-            }
-            windowParameter =
-                new ConditionWindowParameter(
-                    node.isOutputEndTime(),
-                    ((GroupByConditionParameter) groupByParameter).isIgnoringNull(),
-                    layout
-                        .get(groupByConditionExpression.getExpressionString())
-                        .get(0)
-                        .getValueColumnIndex(),
-                    ((GroupByConditionParameter) groupByParameter).getKeepExpression());
-            break;
-          case SESSION_WINDOW:
-            windowParameter =
-                new SessionWindowParameter(
-                    ((GroupBySessionParameter) groupByParameter).getTimeInterval(),
-                    node.isOutputEndTime());
-            break;
-          case COUNT_WINDOW:
-            Expression groupByCountExpression = node.getGroupByExpression();
-            if (groupByCountExpression == null) {
-              throw new IllegalArgumentException("groupByCountExpression can't be null");
-            }
-            windowParameter =
-                new CountWindowParameter(
-                    ((GroupByCountParameter) groupByParameter).getCountNumber(),
-                    layout
-                        .get(groupByCountExpression.getExpressionString())
-                        .get(0)
-                        .getValueColumnIndex(),
-                    node.isOutputEndTime(),
-                    ((GroupByCountParameter) groupByParameter).isIgnoreNull());
-            break;
-          default:
-            throw new IllegalArgumentException("Unsupported window type");
-        }
-        return new RawDataAggregationOperator(
-            operatorContext,
-            aggregators,
-            timeRangeIterator,
-            children.get(0),
-            ascending,
-            maxReturnSize,
-            windowParameter);
-      }
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                RawDataAggregationOperator.class.getSimpleName());
+
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(groupByTimeParameter, ascending, true);
+    long maxReturnSize =
+        calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
 
-      WindowParameter windowParameter = new TimeWindowParameter(node.isOutputEndTime());
+    // groupByParameter and groupByTimeParameter
+    if (groupByParameter != null) {
+      WindowType windowType = groupByParameter.getWindowType();
+
+      WindowParameter windowParameter;
+      switch (windowType) {
+        case VARIATION_WINDOW:
+          Expression groupByVariationExpression = node.getGroupByExpression();
+          if (groupByVariationExpression == null) {
+            throw new IllegalArgumentException("groupByVariationExpression can't be null");
+          }
+          String controlColumn = groupByVariationExpression.getExpressionString();
+          TSDataType controlColumnType = context.getTypeProvider().getType(controlColumn);
+          windowParameter =
+              new VariationWindowParameter(
+                  controlColumnType,
+                  layout.get(controlColumn).get(0).getValueColumnIndex(),
+                  node.isOutputEndTime(),
+                  ((GroupByVariationParameter) groupByParameter).isIgnoringNull(),
+                  ((GroupByVariationParameter) groupByParameter).getDelta());
+          break;
+        case CONDITION_WINDOW:
+          Expression groupByConditionExpression = node.getGroupByExpression();
+          if (groupByConditionExpression == null) {
+            throw new IllegalArgumentException("groupByConditionExpression can't be null");
+          }
+          windowParameter =
+              new ConditionWindowParameter(
+                  node.isOutputEndTime(),
+                  ((GroupByConditionParameter) groupByParameter).isIgnoringNull(),
+                  layout
+                      .get(groupByConditionExpression.getExpressionString())
+                      .get(0)
+                      .getValueColumnIndex(),
+                  ((GroupByConditionParameter) groupByParameter).getKeepExpression());
+          break;
+        case SESSION_WINDOW:
+          windowParameter =
+              new SessionWindowParameter(
+                  ((GroupBySessionParameter) groupByParameter).getTimeInterval(),
+                  node.isOutputEndTime());
+          break;
+        case COUNT_WINDOW:
+          Expression groupByCountExpression = node.getGroupByExpression();
+          if (groupByCountExpression == null) {
+            throw new IllegalArgumentException("groupByCountExpression can't be null");
+          }
+          windowParameter =
+              new CountWindowParameter(
+                  ((GroupByCountParameter) groupByParameter).getCountNumber(),
+                  layout
+                      .get(groupByCountExpression.getExpressionString())
+                      .get(0)
+                      .getValueColumnIndex(),
+                  node.isOutputEndTime(),
+                  ((GroupByCountParameter) groupByParameter).isIgnoreNull());
+          break;
+        default:
+          throw new IllegalArgumentException("Unsupported window type");
+      }
       return new RawDataAggregationOperator(
           operatorContext,
           aggregators,
           timeRangeIterator,
-          children.get(0),
+          child,
           ascending,
           maxReturnSize,
           windowParameter);
-    } else {
-      OperatorContext operatorContext =
-          context
-              .getDriverContext()
-              .addOperatorContext(
-                  context.getNextOperatorId(),
-                  node.getPlanNodeId(),
-                  AggregationOperator.class.getSimpleName());
+    }
 
-      ITimeRangeIterator timeRangeIterator =
-          initTimeRangeIterator(groupByTimeParameter, ascending, true);
-      long maxReturnSize =
-          calculateMaxAggregationResultSize(
-              aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
+    WindowParameter windowParameter = new TimeWindowParameter(node.isOutputEndTime());
+    return new RawDataAggregationOperator(
+        operatorContext,
+        aggregators,
+        timeRangeIterator,
+        child,
+        ascending,
+        maxReturnSize,
+        windowParameter);
+  }
 
-      return new AggregationOperator(
-          operatorContext,
-          aggregators,
-          timeRangeIterator,
-          children,
-          node.isOutputEndTime(),
-          maxReturnSize);
+  @Override
+  public Operator visitAggregation(AggregationNode node, LocalExecutionPlanContext context) {
+    checkArgument(
+        !node.getAggregationDescriptorList().isEmpty(),
+        "Aggregation descriptorList cannot be empty");
+    List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
+    boolean ascending = node.getScanOrder() == Ordering.ASC;
+    List<Aggregator> aggregators = new ArrayList<>();
+    Map<String, List<InputLocation>> layout = makeLayout(node);
+    List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
+    for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) {
+      List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
+      aggregators.add(
+          new Aggregator(
+              AccumulatorFactory.createAccumulator(
+                  descriptor.getAggregationFuncName(),
+                  descriptor.getAggregationType(),
+                  descriptor.getInputExpressions().stream()
+                      .map(x -> context.getTypeProvider().getType(x.getExpressionString()))
+                      .collect(Collectors.toList()),
+                  descriptor.getInputExpressions(),
+                  descriptor.getInputAttributes(),
+                  ascending,
+                  descriptor.getStep().isInputRaw()),
+              descriptor.getStep(),
+              inputLocationList));
     }
+
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                AggregationOperator.class.getSimpleName());
+
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(node.getGroupByTimeParameter(), ascending, true);
+    long maxReturnSize =
+        calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
+
+    return new AggregationOperator(
+        operatorContext,
+        aggregators,
+        timeRangeIterator,
+        children,
+        node.isOutputEndTime(),
+        maxReturnSize);
   }
 
   private List<InputLocation[]> calcInputLocationList(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 3ae7a3f126f..958e85592ea 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -46,6 +46,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
@@ -381,6 +382,11 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
     return processOneChildNode(node, context);
   }
 
+  @Override
+  public PlanNode visitRawDataAggregation(RawDataAggregationNode node, NodeGroupContext context) {
+    return processOneChildNode(node, context);
+  }
+
   @Override
   public PlanNode visitExplainAnalyze(ExplainAnalyzeNode node, NodeGroupContext context) {
     ExplainAnalyzeNode newNode = (ExplainAnalyzeNode) node.clone();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index bda66843a97..c96d87573b9 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -78,6 +78,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
@@ -209,6 +210,8 @@ public enum PlanNodeType {
   EXPLAIN_ANALYZE((short) 90),
 
   PIPE_OPERATE_SCHEMA_QUEUE_REFERENCE((short) 91),
+
+  RAW_DATA_AGGREGATION((short) 92),
   ;
 
   public static final int BYTES = Short.BYTES;
@@ -440,6 +443,8 @@ public enum PlanNodeType {
         return ExplainAnalyzeNode.deserialize(buffer);
       case 91:
         return PipeOperateSchemaQueueNode.deserialize(buffer);
+      case 92:
+        return RawDataAggregationNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 608ca14df24..932a9d22597 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -76,6 +76,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortN
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
@@ -222,6 +223,10 @@ public abstract class PlanVisitor<R, C> {
     return visitSingleChildProcess(node, context);
   }
 
+  public R visitRawDataAggregation(RawDataAggregationNode node, C context) {
+    return visitSingleChildProcess(node, context);
+  }
+
   // two child -----------------------------------------------------------------------------------
 
   public R visitTwoChildProcess(TwoChildProcessNode node, C context) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
new file mode 100644
index 00000000000..ab2683743a3
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByParameter;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode.getDeduplicatedDescriptors;
+
+public class RawDataAggregationNode extends SingleChildProcessNode {
+
+  // The list of aggregate functions, each AggregateDescriptor will be output as one or two column
+  // of result TsBlock
+  protected List<AggregationDescriptor> aggregationDescriptorList;
+
+  // The parameter of `group by time`.
+  // Its value will be null if there is no `group by time` clause.
+  @Nullable protected GroupByTimeParameter groupByTimeParameter;
+
+  // The parameter of `group by`.
+  // Its value will be null if there is no `group by` clause.
+  @Nullable protected GroupByParameter groupByParameter;
+
+  // In some situation of `group by` clause, groupByExpression is required.
+  // It will be null if the clause doesn't refer to any expression.
+  protected Expression groupByExpression;
+
+  protected Ordering scanOrder;
+
+  protected boolean outputEndTime = false;
+
+  public RawDataAggregationNode(
+      PlanNodeId id,
+      List<AggregationDescriptor> aggregationDescriptorList,
+      @Nullable GroupByTimeParameter groupByTimeParameter,
+      Ordering scanOrder) {
+    super(id);
+    this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.scanOrder = scanOrder;
+  }
+
+  public RawDataAggregationNode(
+      PlanNodeId id,
+      PlanNode child,
+      List<AggregationDescriptor> aggregationDescriptorList,
+      @Nullable GroupByTimeParameter groupByTimeParameter,
+      Ordering scanOrder) {
+    super(id, child);
+    this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.scanOrder = scanOrder;
+  }
+
+  public RawDataAggregationNode(
+      PlanNodeId id,
+      List<AggregationDescriptor> aggregationDescriptorList,
+      @Nullable GroupByTimeParameter groupByTimeParameter,
+      @Nullable GroupByParameter groupByParameter,
+      Expression groupByExpression,
+      boolean outputEndTime,
+      Ordering scanOrder) {
+    super(id);
+    this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.scanOrder = scanOrder;
+    this.groupByParameter = groupByParameter;
+    this.groupByExpression = groupByExpression;
+    this.outputEndTime = outputEndTime;
+  }
+
+  public RawDataAggregationNode(
+      PlanNodeId id,
+      PlanNode child,
+      List<AggregationDescriptor> aggregationDescriptorList,
+      @Nullable GroupByTimeParameter groupByTimeParameter,
+      @Nullable GroupByParameter groupByParameter,
+      Expression groupByExpression,
+      boolean outputEndTime,
+      Ordering scanOrder) {
+    super(id, child);
+    this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
+    this.scanOrder = scanOrder;
+    this.groupByParameter = groupByParameter;
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.groupByExpression = groupByExpression;
+    this.outputEndTime = outputEndTime;
+  }
+
+  public List<AggregationDescriptor> getAggregationDescriptorList() {
+    return aggregationDescriptorList;
+  }
+
+  @Nullable
+  public GroupByTimeParameter getGroupByTimeParameter() {
+    return groupByTimeParameter;
+  }
+
+  @Nullable
+  public GroupByParameter getGroupByParameter() {
+    return groupByParameter;
+  }
+
+  public Ordering getScanOrder() {
+    return scanOrder;
+  }
+
+  public boolean isOutputEndTime() {
+    return outputEndTime;
+  }
+
+  public void setOutputEndTime(boolean outputEndTime) {
+    this.outputEndTime = outputEndTime;
+  }
+
+  @Nullable
+  public Expression getGroupByExpression() {
+    return groupByExpression;
+  }
+
+  @Override
+  public PlanNodeType getType() {
+    return PlanNodeType.RAW_DATA_AGGREGATION;
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new RawDataAggregationNode(
+        getPlanNodeId(),
+        getAggregationDescriptorList(),
+        getGroupByTimeParameter(),
+        getGroupByParameter(),
+        getGroupByExpression(),
+        outputEndTime,
+        getScanOrder());
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    List<String> outputColumnNames = new ArrayList<>();
+    if (outputEndTime) {
+      outputColumnNames.add(ColumnHeaderConstant.ENDTIME);
+    }
+    outputColumnNames.addAll(
+        aggregationDescriptorList.stream()
+            .map(AggregationDescriptor::getOutputColumnNames)
+            .flatMap(List::stream)
+            .collect(Collectors.toList()));
+
+    return outputColumnNames;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitRawDataAggregation(this, context);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.RAW_DATA_AGGREGATION.serialize(byteBuffer);
+    ReadWriteIOUtils.write(aggregationDescriptorList.size(), byteBuffer);
+    for (AggregationDescriptor aggregationDescriptor : aggregationDescriptorList) {
+      aggregationDescriptor.serialize(byteBuffer);
+    }
+    if (groupByTimeParameter == null) {
+      ReadWriteIOUtils.write((byte) 0, byteBuffer);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, byteBuffer);
+      groupByTimeParameter.serialize(byteBuffer);
+    }
+    if (groupByParameter == null) {
+      ReadWriteIOUtils.write((byte) 0, byteBuffer);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, byteBuffer);
+      groupByParameter.serialize(byteBuffer);
+    }
+    if (groupByExpression == null) {
+      ReadWriteIOUtils.write((byte) 0, byteBuffer);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, byteBuffer);
+      Expression.serialize(groupByExpression, byteBuffer);
+    }
+    ReadWriteIOUtils.write(outputEndTime, byteBuffer);
+    ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    PlanNodeType.RAW_DATA_AGGREGATION.serialize(stream);
+    ReadWriteIOUtils.write(aggregationDescriptorList.size(), stream);
+    for (AggregationDescriptor aggregationDescriptor : aggregationDescriptorList) {
+      aggregationDescriptor.serialize(stream);
+    }
+    if (groupByTimeParameter == null) {
+      ReadWriteIOUtils.write((byte) 0, stream);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, stream);
+      groupByTimeParameter.serialize(stream);
+    }
+    if (groupByParameter == null) {
+      ReadWriteIOUtils.write((byte) 0, stream);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, stream);
+      groupByParameter.serialize(stream);
+    }
+    if (groupByExpression == null) {
+      ReadWriteIOUtils.write((byte) 0, stream);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, stream);
+      Expression.serialize(groupByExpression, stream);
+    }
+    ReadWriteIOUtils.write(outputEndTime, stream);
+    ReadWriteIOUtils.write(scanOrder.ordinal(), stream);
+  }
+
+  public static RawDataAggregationNode deserialize(ByteBuffer byteBuffer) {
+    int descriptorSize = ReadWriteIOUtils.readInt(byteBuffer);
+    List<AggregationDescriptor> aggregationDescriptorList = new ArrayList<>();
+    while (descriptorSize > 0) {
+      aggregationDescriptorList.add(AggregationDescriptor.deserialize(byteBuffer));
+      descriptorSize--;
+    }
+    byte isNull = ReadWriteIOUtils.readByte(byteBuffer);
+    GroupByTimeParameter groupByTimeParameter = null;
+    if (isNull == 1) {
+      groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
+    }
+    isNull = ReadWriteIOUtils.readByte(byteBuffer);
+    GroupByParameter groupByParameter = null;
+    if (isNull == 1) {
+      groupByParameter = GroupByParameter.deserialize(byteBuffer);
+    }
+    isNull = ReadWriteIOUtils.readByte(byteBuffer);
+    Expression groupByExpression = null;
+    if (isNull == 1) {
+      groupByExpression = Expression.deserialize(byteBuffer);
+    }
+    boolean outputEndTime = ReadWriteIOUtils.readBool(byteBuffer);
+    Ordering scanOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new RawDataAggregationNode(
+        planNodeId,
+        aggregationDescriptorList,
+        groupByTimeParameter,
+        groupByParameter,
+        groupByExpression,
+        outputEndTime,
+        scanOrder);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    RawDataAggregationNode that = (RawDataAggregationNode) o;
+    return Objects.equals(aggregationDescriptorList, that.aggregationDescriptorList)
+        && Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
+        && Objects.equals(groupByParameter, that.groupByParameter)
+        && Objects.equals(groupByExpression, that.groupByExpression)
+        && Objects.equals(outputEndTime, that.outputEndTime)
+        && scanOrder == that.scanOrder;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        super.hashCode(),
+        aggregationDescriptorList,
+        groupByTimeParameter,
+        groupByParameter,
+        groupByExpression,
+        outputEndTime,
+        scanOrder);
+  }
+
+  public String toString() {
+    return String.format("RawDataAggregationNode-%s", getPlanNodeId());
+  }
+}