You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/18 17:40:46 UTC
[iotdb] 02/03: complete basic timejoin aggregation
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/agg_distribution_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3ce2308d23b84410b042b9e821e67df52d12781c
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu May 19 00:43:38 2022 +0800
complete basic timejoin aggregation
---
.../db/mpp/plan/planner/DistributionPlanner.java | 135 +++++++++++++++++----
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 4 +
.../planner/plan/node/process/AggregationNode.java | 10 +-
.../planner/plan/node/process/MultiChildNode.java | 39 ++++++
.../planner/plan/node/process/TimeJoinNode.java | 15 +--
.../source/AlignedSeriesAggregationScanNode.java | 24 ++--
.../node/source/SeriesAggregationScanNode.java | 29 ++---
.../node/source/SeriesAggregationSourceNode.java | 45 +++++++
.../plan/parameter/AggregationDescriptor.java | 4 +
9 files changed, 234 insertions(+), 71 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
index 9df8f13875..ff664d93fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.plan.planner;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
@@ -41,16 +42,20 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryM
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesSourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.checkerframework.checker.units.qual.A;
import java.util.ArrayList;
import java.util.Collections;
@@ -327,8 +332,14 @@ public class DistributionPlanner {
@Override
public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context) {
- TimeJoinNode root = (TimeJoinNode) node.clone();
+ // Although some logic is similar between Aggregation and RawDataQuery,
+ // we still use separate method to process the distribution planning now
+ // to make the planning procedure more clear
+ if (isAggregationQuery(node)) {
+ return planAggregationWithTimeJoin(node, context);
+ }
+ TimeJoinNode root = (TimeJoinNode) node.clone();
// Step 1: Get all source nodes. For the node which is not source, add it as the child of
// current TimeJoinNode
List<SourceNode> sources = new ArrayList<>();
@@ -347,34 +358,12 @@ public class DistributionPlanner {
split.setRegionReplicaSet(dataRegion);
sources.add(split);
}
- } else if (child instanceof AlignedSeriesScanNode) {
- AlignedSeriesScanNode handle = (AlignedSeriesScanNode) child;
- List<TRegionReplicaSet> dataDistribution =
- analysis.getPartitionInfo(handle.getAlignedPath(), handle.getTimeFilter());
- // If the size of dataDistribution is m, this SeriesScanNode should be seperated into m
- // SeriesScanNode.
- for (TRegionReplicaSet dataRegion : dataDistribution) {
- AlignedSeriesScanNode split = (AlignedSeriesScanNode) handle.clone();
- split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
- split.setRegionReplicaSet(dataRegion);
- sources.add(split);
- }
- } else if (child instanceof SeriesAggregationScanNode) {
- // TODO: (xingtanzjr) We should do the same thing for SeriesAggregateScanNode. Consider to
- // make SeriesAggregateScanNode
- // and SeriesScanNode to derived from the same parent Class because they have similar
- // process logic in many scenarios
- } else {
- // In a general logical query plan, the children of TimeJoinNode should only be
- // SeriesScanNode or SeriesAggregateScanNode
- // So this branch should not be touched.
- root.addChild(visit(child, context));
}
}
-
// Step 2: For the source nodes, group them by the DataRegion.
Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+
// Step 3: For the source nodes which belong to same data region, add a TimeJoinNode for them
// and make the
// new TimeJoinNode as the child of current TimeJoinNode
@@ -392,16 +381,102 @@ public class DistributionPlanner {
// We clone a TimeJoinNode from root to make the params to be consistent.
// But we need to assign a new ID to it
TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
- root.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
seriesScanNodes.forEach(parentOfGroup::addChild);
root.addChild(parentOfGroup);
}
}
});
+ // Process the other children which are not SeriesSourceNode
+ for (PlanNode child : node.getChildren()) {
+ if (!(child instanceof SeriesSourceNode)) {
+ // In a general logical query plan, the children of TimeJoinNode should only be
+ // SeriesScanNode or SeriesAggregateScanNode
+ // So this branch should not be touched.
+ root.addChild(visit(child, context));
+ }
+ }
+
return root;
}
+ private boolean isAggregationQuery(TimeJoinNode node) {
+ for (PlanNode child : node.getChildren()) {
+ if (child instanceof SeriesAggregationScanNode
+ || child instanceof AlignedSeriesAggregationScanNode) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private PlanNode planAggregationWithTimeJoin(
+ TimeJoinNode root,
+ DistributionPlanContext context) {
+
+ // Step 1: construct AggregationDescriptor for AggregationNode
+ List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
+ List<SeriesAggregationSourceNode> sources = new ArrayList<>();
+ Map<PartialPath, Integer> regionCountPerSeries = new HashMap<>();
+ for (PlanNode child : root.getChildren()) {
+ SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
+ handle.getAggregationDescriptorList()
+ .forEach(
+ descriptor -> {
+ rootAggDescriptorList.add(
+ new AggregationDescriptor(
+ descriptor.getAggregationType(),
+ AggregationStep.FINAL,
+ descriptor.getInputExpressions()));
+ });
+ List<TRegionReplicaSet> dataDistribution =
+ analysis.getPartitionInfo(handle.getPartitionPath(), handle.getPartitionTimeFilter());
+ for (TRegionReplicaSet dataRegion : dataDistribution) {
+ SeriesAggregationSourceNode split = (SeriesAggregationSourceNode) handle.clone();
+ split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ split.setRegionReplicaSet(dataRegion);
+ // Let each split reference different object of AggregationDescriptorList
+ split.setAggregationDescriptorList(handle.getAggregationDescriptorList().stream().map(AggregationDescriptor::deepClone).collect(Collectors.toList()));
+ sources.add(split);
+ }
+ regionCountPerSeries.put(handle.getPartitionPath(), dataDistribution.size());
+ }
+
+ // Step 2: change the step for each SeriesAggregationSourceNode according to its split count
+ for (SeriesAggregationSourceNode node : sources) {
+ boolean isFinal = regionCountPerSeries.get(node.getPartitionPath()) == 1;
+ node.getAggregationDescriptorList().forEach(d -> d.setStep(isFinal ? AggregationStep.FINAL : AggregationStep.PARTIAL));
+ }
+
+ Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
+ sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+
+ AggregationNode aggregationNode = new AggregationNode(context.queryContext.getQueryId().genPlanNodeId(), rootAggDescriptorList);
+
+ final boolean[] addParent = {false};
+ sourceGroup.forEach(
+ (dataRegion, sourceNodes) -> {
+ if (sourceNodes.size() == 1) {
+ aggregationNode.addChild(sourceNodes.get(0));
+ } else {
+ if (!addParent[0]) {
+ sourceNodes.forEach(aggregationNode::addChild);
+ addParent[0] = true;
+ } else {
+ // We clone a TimeJoinNode from root to make the params to be consistent.
+ // But we need to assign a new ID to it
+ TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
+ parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ sourceNodes.forEach(parentOfGroup::addChild);
+ aggregationNode.addChild(parentOfGroup);
+ }
+ }
+ });
+
+ return aggregationNode;
+ }
+
public PlanNode visit(PlanNode node, DistributionPlanContext context) {
return node.accept(this, context);
}
@@ -527,7 +602,15 @@ public class DistributionPlanner {
@Override
public PlanNode visitTimeJoin(TimeJoinNode node, NodeGroupContext context) {
- TimeJoinNode newNode = (TimeJoinNode) node.clone();
+ return processMultiChildNode(node, context);
+ }
+
+ public PlanNode visitAggregation(AggregationNode node, NodeGroupContext context) {
+ return processMultiChildNode(node, context);
+ }
+
+ private PlanNode processMultiChildNode(MultiChildNode node, NodeGroupContext context) {
+ MultiChildNode newNode = (MultiChildNode) node.clone();
List<PlanNode> visitedChildren = new ArrayList<>();
node.getChildren()
.forEach(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 4d552df27b..73df5ae27c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -74,6 +74,10 @@ public abstract class PlanVisitor<R, C> {
return visitPlan(node, context);
}
+ public R visitAggregation(AggregationNode node, C context) {
+ return visitPlan(node, context);
+ }
+
public R visitAlignedSeriesScan(AlignedSeriesScanNode node, C context) {
return visitPlan(node, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index e02958ea62..e51045fb97 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -42,7 +42,7 @@ import java.util.stream.Collectors;
* input as a TsBlock, it may be raw data or partial aggregation result. This node will output the
* final series aggregated result represented by TsBlock.
*/
-public class AggregationNode extends ProcessNode {
+public class AggregationNode extends MultiChildNode {
// The list of aggregate functions, each AggregateDescriptor will be output as one column of
// result TsBlock
@@ -53,8 +53,6 @@ public class AggregationNode extends ProcessNode {
@Nullable protected GroupByTimeParameter groupByTimeParameter;
protected OrderBy scanOrder = OrderBy.TIMESTAMP_ASC;
- protected List<PlanNode> children;
-
public AggregationNode(
PlanNodeId id,
List<PlanNode> children,
@@ -67,8 +65,7 @@ public class AggregationNode extends ProcessNode {
List<PlanNode> children,
List<AggregationDescriptor> aggregationDescriptorList,
@Nullable GroupByTimeParameter groupByTimeParameter) {
- super(id);
- this.children = children;
+ super(id, children);
this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
this.groupByTimeParameter = groupByTimeParameter;
}
@@ -81,10 +78,9 @@ public class AggregationNode extends ProcessNode {
PlanNodeId id,
List<AggregationDescriptor> aggregationDescriptorList,
@Nullable GroupByTimeParameter groupByTimeParameter) {
- super(id);
+ super(id, new ArrayList<>());
this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
this.groupByTimeParameter = groupByTimeParameter;
- this.children = new ArrayList<>();
}
public List<AggregationDescriptor> getAggregationDescriptorList() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
new file mode 100644
index 0000000000..61da38a121
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+
+import java.util.List;
+
+public abstract class MultiChildNode extends ProcessNode {
+
+ protected List<PlanNode> children;
+
+ public MultiChildNode(PlanNodeId id, List<PlanNode> children) {
+ super(id);
+ this.children = children;
+ }
+
+ public void setChildren(List<PlanNode> children) {
+ this.children = children;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
index a39d0af54f..f081270e2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
@@ -36,26 +36,19 @@ import java.util.stream.Collectors;
* timestamp column. It will join two or more TsBlock by Timestamp column. The output result of
* TimeJoinOperator is sorted by timestamp
*/
-public class TimeJoinNode extends ProcessNode {
+public class TimeJoinNode extends MultiChildNode {
// This parameter indicates the order when executing multiway merge sort.
private final OrderBy mergeOrder;
- private List<PlanNode> children;
-
public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder) {
- super(id);
+ super(id, new ArrayList<>());
this.mergeOrder = mergeOrder;
- this.children = new ArrayList<>();
}
public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder, List<PlanNode> children) {
- this(id, mergeOrder);
- this.children = children;
- }
-
- public void setChildren(List<PlanNode> children) {
- this.children = children;
+ super(id, children);
+ this.mergeOrder = mergeOrder;
}
public OrderBy getMergeOrder() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
index c57e6692b5..db9c3fe354 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
@@ -44,15 +45,11 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
-public class AlignedSeriesAggregationScanNode extends SourceNode {
+public class AlignedSeriesAggregationScanNode extends SeriesAggregationSourceNode {
// The paths of the target series which will be aggregated.
private final AlignedPath alignedPath;
- // The list of aggregate functions, each AggregateDescriptor will be output as one column in
- // result TsBlock
- private final List<AggregationDescriptor> aggregationDescriptorList;
-
// The order to traverse the data.
// Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
// The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
@@ -72,9 +69,8 @@ public class AlignedSeriesAggregationScanNode extends SourceNode {
PlanNodeId id,
AlignedPath alignedPath,
List<AggregationDescriptor> aggregationDescriptorList) {
- super(id);
+ super(id, aggregationDescriptorList);
this.alignedPath = alignedPath;
- this.aggregationDescriptorList = aggregationDescriptorList;
}
public AlignedSeriesAggregationScanNode(
@@ -105,10 +101,6 @@ public class AlignedSeriesAggregationScanNode extends SourceNode {
return alignedPath;
}
- public List<AggregationDescriptor> getAggregationDescriptorList() {
- return aggregationDescriptorList;
- }
-
public OrderBy getScanOrder() {
return scanOrder;
}
@@ -270,4 +262,14 @@ public class AlignedSeriesAggregationScanNode extends SourceNode {
groupByTimeParameter,
regionReplicaSet);
}
+
+ @Override
+ public PartialPath getPartitionPath() {
+ return alignedPath;
+ }
+
+ @Override
+ public Filter getPartitionTimeFilter() {
+ return timeFilter;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
index ced35c4fb4..f30caee4c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
@@ -59,15 +60,11 @@ import java.util.stream.Collectors;
* represent the whole aggregation result of this series. And the timestamp will be 0, which is
* meaningless.
*/
-public class SeriesAggregationScanNode extends SourceNode {
+public class SeriesAggregationScanNode extends SeriesAggregationSourceNode {
// The path of the target series which will be aggregated.
private final MeasurementPath seriesPath;
- // The list of aggregate functions, each AggregateDescriptor will be output as one column in
- // result TsBlock
- private List<AggregationDescriptor> aggregationDescriptorList;
-
// The order to traverse the data.
// Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
// The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
@@ -87,10 +84,8 @@ public class SeriesAggregationScanNode extends SourceNode {
PlanNodeId id,
MeasurementPath seriesPath,
List<AggregationDescriptor> aggregationDescriptorList) {
- super(id);
+ super(id, AggregationNode.getDeduplicatedDescriptors(aggregationDescriptorList));
this.seriesPath = seriesPath;
- this.aggregationDescriptorList =
- AggregationNode.getDeduplicatedDescriptors(aggregationDescriptorList);
}
public SeriesAggregationScanNode(
@@ -139,10 +134,6 @@ public class SeriesAggregationScanNode extends SourceNode {
return seriesPath;
}
- public List<AggregationDescriptor> getAggregationDescriptorList() {
- return aggregationDescriptorList;
- }
-
@Override
public List<PlanNode> getChildren() {
return ImmutableList.of();
@@ -178,10 +169,6 @@ public class SeriesAggregationScanNode extends SourceNode {
.collect(Collectors.toList());
}
- public void setAggregationDescriptorList(List<AggregationDescriptor> aggregationDescriptorList) {
- this.aggregationDescriptorList = aggregationDescriptorList;
- }
-
@Override
public void open() throws Exception {}
@@ -289,4 +276,14 @@ public class SeriesAggregationScanNode extends SourceNode {
groupByTimeParameter,
regionReplicaSet);
}
+
+ @Override
+ public PartialPath getPartitionPath() {
+ return seriesPath;
+ }
+
+ @Override
+ public Filter getPartitionTimeFilter() {
+ return timeFilter;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
new file mode 100644
index 0000000000..c7701848bc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+
+import java.util.List;
+
+public abstract class SeriesAggregationSourceNode extends SeriesSourceNode{
+
+ // The list of aggregate functions, each AggregateDescriptor will be output as one column in
+ // result TsBlock
+ protected List<AggregationDescriptor> aggregationDescriptorList;
+
+ public SeriesAggregationSourceNode(PlanNodeId id, List<AggregationDescriptor> aggregationDescriptorList) {
+ super(id);
+ this.aggregationDescriptorList = aggregationDescriptorList;
+ }
+
+ public List<AggregationDescriptor> getAggregationDescriptorList() {
+ return aggregationDescriptorList;
+ }
+
+ public void setAggregationDescriptorList(List<AggregationDescriptor> aggregationDescriptorList) {
+ this.aggregationDescriptorList = aggregationDescriptorList;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
index 74fd995005..cc10983355 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -122,6 +122,10 @@ public class AggregationDescriptor {
this.step = step;
}
+ public AggregationDescriptor deepClone() {
+ return new AggregationDescriptor(this.getAggregationType(), this.step, this.getInputExpressions());
+ }
+
public void serialize(ByteBuffer byteBuffer) {
ReadWriteIOUtils.write(aggregationType.ordinal(), byteBuffer);
step.serialize(byteBuffer);