You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/18 17:40:46 UTC

[iotdb] 02/03: complete basic timejoin aggregation

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/agg_distribution_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3ce2308d23b84410b042b9e821e67df52d12781c
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu May 19 00:43:38 2022 +0800

    complete basic timejoin aggregation
---
 .../db/mpp/plan/planner/DistributionPlanner.java   | 135 +++++++++++++++++----
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   4 +
 .../planner/plan/node/process/AggregationNode.java |  10 +-
 .../planner/plan/node/process/MultiChildNode.java  |  39 ++++++
 .../planner/plan/node/process/TimeJoinNode.java    |  15 +--
 .../source/AlignedSeriesAggregationScanNode.java   |  24 ++--
 .../node/source/SeriesAggregationScanNode.java     |  29 ++---
 .../node/source/SeriesAggregationSourceNode.java   |  45 +++++++
 .../plan/parameter/AggregationDescriptor.java      |   4 +
 9 files changed, 234 insertions(+), 71 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
index 9df8f13875..ff664d93fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.plan.planner;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
@@ -41,16 +42,20 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryM
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesSourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.checkerframework.checker.units.qual.A;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -327,8 +332,14 @@ public class DistributionPlanner {
 
     @Override
     public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context) {
-      TimeJoinNode root = (TimeJoinNode) node.clone();
+      // Although some logic is similar between Aggregation and RawDataQuery,
+      // we still use separate method to process the distribution planning now
+      // to make the planning procedure more clear
+      if (isAggregationQuery(node)) {
+        return planAggregationWithTimeJoin(node, context);
+      }
 
+      TimeJoinNode root = (TimeJoinNode) node.clone();
       // Step 1: Get all source nodes. For the node which is not source, add it as the child of
       // current TimeJoinNode
       List<SourceNode> sources = new ArrayList<>();
@@ -347,34 +358,12 @@ public class DistributionPlanner {
             split.setRegionReplicaSet(dataRegion);
             sources.add(split);
           }
-        } else if (child instanceof AlignedSeriesScanNode) {
-          AlignedSeriesScanNode handle = (AlignedSeriesScanNode) child;
-          List<TRegionReplicaSet> dataDistribution =
-              analysis.getPartitionInfo(handle.getAlignedPath(), handle.getTimeFilter());
-          // If the size of dataDistribution is m, this SeriesScanNode should be seperated into m
-          // SeriesScanNode.
-          for (TRegionReplicaSet dataRegion : dataDistribution) {
-            AlignedSeriesScanNode split = (AlignedSeriesScanNode) handle.clone();
-            split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-            split.setRegionReplicaSet(dataRegion);
-            sources.add(split);
-          }
-        } else if (child instanceof SeriesAggregationScanNode) {
-          // TODO: (xingtanzjr) We should do the same thing for SeriesAggregateScanNode. Consider to
-          // make SeriesAggregateScanNode
-          // and SeriesScanNode to derived from the same parent Class because they have similar
-          // process logic in many scenarios
-        } else {
-          // In a general logical query plan, the children of TimeJoinNode should only be
-          // SeriesScanNode or SeriesAggregateScanNode
-          // So this branch should not be touched.
-          root.addChild(visit(child, context));
         }
       }
-
       // Step 2: For the source nodes, group them by the DataRegion.
       Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
           sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+
       // Step 3: For the source nodes which belong to same data region, add a TimeJoinNode for them
       // and make the
       // new TimeJoinNode as the child of current TimeJoinNode
@@ -392,16 +381,102 @@ public class DistributionPlanner {
                 // We clone a TimeJoinNode from root to make the params to be consistent.
                 // But we need to assign a new ID to it
                 TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
-                root.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+                parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
                 seriesScanNodes.forEach(parentOfGroup::addChild);
                 root.addChild(parentOfGroup);
               }
             }
           });
 
+      // Process the other children which are not SeriesSourceNode
+      for (PlanNode child : node.getChildren()) {
+        if (!(child instanceof SeriesSourceNode)) {
+          // In a general logical query plan, the children of TimeJoinNode should only be
+          // SeriesScanNode or SeriesAggregateScanNode
+          // So this branch should not be touched.
+          root.addChild(visit(child, context));
+        }
+      }
+
       return root;
     }
 
+    private boolean isAggregationQuery(TimeJoinNode node) {
+      for (PlanNode child : node.getChildren()) {
+        if (child instanceof SeriesAggregationScanNode
+            || child instanceof AlignedSeriesAggregationScanNode) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    private PlanNode planAggregationWithTimeJoin(
+        TimeJoinNode root,
+        DistributionPlanContext context) {
+
+      // Step 1: construct AggregationDescriptor for AggregationNode
+      List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
+      List<SeriesAggregationSourceNode> sources = new ArrayList<>();
+      Map<PartialPath, Integer> regionCountPerSeries = new HashMap<>();
+      for (PlanNode child : root.getChildren()) {
+        SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
+        handle.getAggregationDescriptorList()
+            .forEach(
+                descriptor -> {
+                  rootAggDescriptorList.add(
+                      new AggregationDescriptor(
+                          descriptor.getAggregationType(),
+                          AggregationStep.FINAL,
+                          descriptor.getInputExpressions()));
+                });
+        List<TRegionReplicaSet> dataDistribution =
+            analysis.getPartitionInfo(handle.getPartitionPath(), handle.getPartitionTimeFilter());
+        for (TRegionReplicaSet dataRegion : dataDistribution) {
+          SeriesAggregationSourceNode split = (SeriesAggregationSourceNode) handle.clone();
+          split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+          split.setRegionReplicaSet(dataRegion);
+          // Let each split reference different object of AggregationDescriptorList
+          split.setAggregationDescriptorList(handle.getAggregationDescriptorList().stream().map(AggregationDescriptor::deepClone).collect(Collectors.toList()));
+          sources.add(split);
+        }
+        regionCountPerSeries.put(handle.getPartitionPath(), dataDistribution.size());
+      }
+
+      // Step 2: change the step for each SeriesAggregationSourceNode according to its split count
+      for (SeriesAggregationSourceNode node : sources) {
+        boolean isFinal = regionCountPerSeries.get(node.getPartitionPath()) == 1;
+        node.getAggregationDescriptorList().forEach(d -> d.setStep(isFinal ? AggregationStep.FINAL : AggregationStep.PARTIAL));
+      }
+
+      Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
+          sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+
+      AggregationNode aggregationNode = new AggregationNode(context.queryContext.getQueryId().genPlanNodeId(), rootAggDescriptorList);
+
+      final boolean[] addParent = {false};
+      sourceGroup.forEach(
+          (dataRegion, sourceNodes) -> {
+            if (sourceNodes.size() == 1) {
+              aggregationNode.addChild(sourceNodes.get(0));
+            } else {
+              if (!addParent[0]) {
+                sourceNodes.forEach(aggregationNode::addChild);
+                addParent[0] = true;
+              } else {
+                // We clone a TimeJoinNode from root to make the params to be consistent.
+                // But we need to assign a new ID to it
+                TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
+                parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+                sourceNodes.forEach(parentOfGroup::addChild);
+                aggregationNode.addChild(parentOfGroup);
+              }
+            }
+          });
+
+      return aggregationNode;
+    }
+
     public PlanNode visit(PlanNode node, DistributionPlanContext context) {
       return node.accept(this, context);
     }
@@ -527,7 +602,15 @@ public class DistributionPlanner {
 
     @Override
     public PlanNode visitTimeJoin(TimeJoinNode node, NodeGroupContext context) {
-      TimeJoinNode newNode = (TimeJoinNode) node.clone();
+      return processMultiChildNode(node, context);
+    }
+
+    public PlanNode visitAggregation(AggregationNode node, NodeGroupContext context) {
+      return processMultiChildNode(node, context);
+    }
+
+    private PlanNode processMultiChildNode(MultiChildNode node, NodeGroupContext context) {
+      MultiChildNode newNode = (MultiChildNode) node.clone();
       List<PlanNode> visitedChildren = new ArrayList<>();
       node.getChildren()
           .forEach(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 4d552df27b..73df5ae27c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -74,6 +74,10 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
+  public R visitAggregation(AggregationNode node, C context) {
+    return visitPlan(node, context);
+  }
+
   public R visitAlignedSeriesScan(AlignedSeriesScanNode node, C context) {
     return visitPlan(node, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index e02958ea62..e51045fb97 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -42,7 +42,7 @@ import java.util.stream.Collectors;
  * input as a TsBlock, it may be raw data or partial aggregation result. This node will output the
  * final series aggregated result represented by TsBlock.
  */
-public class AggregationNode extends ProcessNode {
+public class AggregationNode extends MultiChildNode {
 
   // The list of aggregate functions, each AggregateDescriptor will be output as one column of
   // result TsBlock
@@ -53,8 +53,6 @@ public class AggregationNode extends ProcessNode {
   @Nullable protected GroupByTimeParameter groupByTimeParameter;
   protected OrderBy scanOrder = OrderBy.TIMESTAMP_ASC;
 
-  protected List<PlanNode> children;
-
   public AggregationNode(
       PlanNodeId id,
       List<PlanNode> children,
@@ -67,8 +65,7 @@ public class AggregationNode extends ProcessNode {
       List<PlanNode> children,
       List<AggregationDescriptor> aggregationDescriptorList,
       @Nullable GroupByTimeParameter groupByTimeParameter) {
-    super(id);
-    this.children = children;
+    super(id, children);
     this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
     this.groupByTimeParameter = groupByTimeParameter;
   }
@@ -81,10 +78,9 @@ public class AggregationNode extends ProcessNode {
       PlanNodeId id,
       List<AggregationDescriptor> aggregationDescriptorList,
       @Nullable GroupByTimeParameter groupByTimeParameter) {
-    super(id);
+    super(id, new ArrayList<>());
     this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
     this.groupByTimeParameter = groupByTimeParameter;
-    this.children = new ArrayList<>();
   }
 
   public List<AggregationDescriptor> getAggregationDescriptorList() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
new file mode 100644
index 0000000000..61da38a121
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+
+import java.util.List;
+
+public abstract class MultiChildNode extends ProcessNode {
+
+  protected List<PlanNode> children;
+
+  public MultiChildNode(PlanNodeId id, List<PlanNode> children) {
+    super(id);
+    this.children = children;
+  }
+
+  public void setChildren(List<PlanNode> children) {
+    this.children = children;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
index a39d0af54f..f081270e2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
@@ -36,26 +36,19 @@ import java.util.stream.Collectors;
  * timestamp column. It will join two or more TsBlock by Timestamp column. The output result of
  * TimeJoinOperator is sorted by timestamp
  */
-public class TimeJoinNode extends ProcessNode {
+public class TimeJoinNode extends MultiChildNode {
 
   // This parameter indicates the order when executing multiway merge sort.
   private final OrderBy mergeOrder;
 
-  private List<PlanNode> children;
-
   public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder) {
-    super(id);
+    super(id, new ArrayList<>());
     this.mergeOrder = mergeOrder;
-    this.children = new ArrayList<>();
   }
 
   public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder, List<PlanNode> children) {
-    this(id, mergeOrder);
-    this.children = children;
-  }
-
-  public void setChildren(List<PlanNode> children) {
-    this.children = children;
+    super(id, children);
+    this.mergeOrder = mergeOrder;
   }
 
   public OrderBy getMergeOrder() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
index c57e6692b5..db9c3fe354 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
@@ -44,15 +45,11 @@ import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-public class AlignedSeriesAggregationScanNode extends SourceNode {
+public class AlignedSeriesAggregationScanNode extends SeriesAggregationSourceNode {
 
   // The paths of the target series which will be aggregated.
   private final AlignedPath alignedPath;
 
-  // The list of aggregate functions, each AggregateDescriptor will be output as one column in
-  // result TsBlock
-  private final List<AggregationDescriptor> aggregationDescriptorList;
-
   // The order to traverse the data.
   // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
   // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
@@ -72,9 +69,8 @@ public class AlignedSeriesAggregationScanNode extends SourceNode {
       PlanNodeId id,
       AlignedPath alignedPath,
       List<AggregationDescriptor> aggregationDescriptorList) {
-    super(id);
+    super(id, aggregationDescriptorList);
     this.alignedPath = alignedPath;
-    this.aggregationDescriptorList = aggregationDescriptorList;
   }
 
   public AlignedSeriesAggregationScanNode(
@@ -105,10 +101,6 @@ public class AlignedSeriesAggregationScanNode extends SourceNode {
     return alignedPath;
   }
 
-  public List<AggregationDescriptor> getAggregationDescriptorList() {
-    return aggregationDescriptorList;
-  }
-
   public OrderBy getScanOrder() {
     return scanOrder;
   }
@@ -270,4 +262,14 @@ public class AlignedSeriesAggregationScanNode extends SourceNode {
         groupByTimeParameter,
         regionReplicaSet);
   }
+
+  @Override
+  public PartialPath getPartitionPath() {
+    return alignedPath;
+  }
+
+  @Override
+  public Filter getPartitionTimeFilter() {
+    return timeFilter;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
index ced35c4fb4..f30caee4c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
@@ -59,15 +60,11 @@ import java.util.stream.Collectors;
  * represent the whole aggregation result of this series. And the timestamp will be 0, which is
  * meaningless.
  */
-public class SeriesAggregationScanNode extends SourceNode {
+public class SeriesAggregationScanNode extends SeriesAggregationSourceNode {
 
   // The path of the target series which will be aggregated.
   private final MeasurementPath seriesPath;
 
-  // The list of aggregate functions, each AggregateDescriptor will be output as one column in
-  // result TsBlock
-  private List<AggregationDescriptor> aggregationDescriptorList;
-
   // The order to traverse the data.
   // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
   // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
@@ -87,10 +84,8 @@ public class SeriesAggregationScanNode extends SourceNode {
       PlanNodeId id,
       MeasurementPath seriesPath,
       List<AggregationDescriptor> aggregationDescriptorList) {
-    super(id);
+    super(id, AggregationNode.getDeduplicatedDescriptors(aggregationDescriptorList));
     this.seriesPath = seriesPath;
-    this.aggregationDescriptorList =
-        AggregationNode.getDeduplicatedDescriptors(aggregationDescriptorList);
   }
 
   public SeriesAggregationScanNode(
@@ -139,10 +134,6 @@ public class SeriesAggregationScanNode extends SourceNode {
     return seriesPath;
   }
 
-  public List<AggregationDescriptor> getAggregationDescriptorList() {
-    return aggregationDescriptorList;
-  }
-
   @Override
   public List<PlanNode> getChildren() {
     return ImmutableList.of();
@@ -178,10 +169,6 @@ public class SeriesAggregationScanNode extends SourceNode {
         .collect(Collectors.toList());
   }
 
-  public void setAggregationDescriptorList(List<AggregationDescriptor> aggregationDescriptorList) {
-    this.aggregationDescriptorList = aggregationDescriptorList;
-  }
-
   @Override
   public void open() throws Exception {}
 
@@ -289,4 +276,14 @@ public class SeriesAggregationScanNode extends SourceNode {
         groupByTimeParameter,
         regionReplicaSet);
   }
+
+  @Override
+  public PartialPath getPartitionPath() {
+    return seriesPath;
+  }
+
+  @Override
+  public Filter getPartitionTimeFilter() {
+    return timeFilter;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
new file mode 100644
index 0000000000..c7701848bc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+
+import java.util.List;
+
+public abstract class SeriesAggregationSourceNode extends SeriesSourceNode{
+
+  // The list of aggregate functions, each AggregateDescriptor will be output as one column in
+  // result TsBlock
+  protected List<AggregationDescriptor> aggregationDescriptorList;
+
+  public SeriesAggregationSourceNode(PlanNodeId id, List<AggregationDescriptor> aggregationDescriptorList) {
+    super(id);
+    this.aggregationDescriptorList = aggregationDescriptorList;
+  }
+
+  public List<AggregationDescriptor> getAggregationDescriptorList() {
+    return aggregationDescriptorList;
+  }
+
+  public void setAggregationDescriptorList(List<AggregationDescriptor> aggregationDescriptorList) {
+    this.aggregationDescriptorList = aggregationDescriptorList;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
index 74fd995005..cc10983355 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -122,6 +122,10 @@ public class AggregationDescriptor {
     this.step = step;
   }
 
+  public AggregationDescriptor deepClone() {
+    return new AggregationDescriptor(this.getAggregationType(), this.step, this.getInputExpressions());
+  }
+
   public void serialize(ByteBuffer byteBuffer) {
     ReadWriteIOUtils.write(aggregationType.ordinal(), byteBuffer);
     step.serialize(byteBuffer);