You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/27 04:28:52 UTC
[iotdb] branch xingtanzjr/agg_groupbytime updated: add groupbytime parameter and scanorder when distribution planning
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/agg_groupbytime
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/agg_groupbytime by this push:
new 231e09e018 add groupbytime parameter and scanorder when distribution planning
231e09e018 is described below
commit 231e09e018d007b3cde4bc74a2b36d292418aae1
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri May 27 12:28:42 2022 +0800
add groupbytime parameter and scanorder when distribution planning
---
.../plan/planner/distribution/SourceRewriter.java | 13 +++++++--
.../planner/plan/node/process/AggregationNode.java | 5 ----
.../source/AlignedSeriesAggregationScanNode.java | 12 ---------
.../node/source/SeriesAggregationScanNode.java | 12 ---------
.../node/source/SeriesAggregationSourceNode.java | 31 ++++++++++++++++++++++
5 files changed, 42 insertions(+), 31 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 65d4d6552f..72c1819c80 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -321,7 +321,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
AggregationNode aggregationNode =
new AggregationNode(
- context.queryContext.getQueryId().genPlanNodeId(), rootAggDescriptorList);
+ context.queryContext.getQueryId().genPlanNodeId(),
+ rootAggDescriptorList,
+ node.getGroupByTimeParameter(),
+ node.getScanOrder());
for (TRegionReplicaSet dataRegion : dataDistribution) {
SeriesAggregationScanNode split = (SeriesAggregationScanNode) node.clone();
split.setAggregationDescriptorList(leafAggDescriptorList);
@@ -470,9 +473,15 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
descriptor.getInputExpressions()));
});
}
+ checkArgument(
+ sources.size() > 0, "Aggregation sources should not be empty when distribution planning");
+ SeriesAggregationSourceNode seed = sources.get(0);
AggregationNode aggregationNode =
new AggregationNode(
- context.queryContext.getQueryId().genPlanNodeId(), rootAggDescriptorList);
+ context.queryContext.getQueryId().genPlanNodeId(),
+ rootAggDescriptorList,
+ seed.getGroupByTimeParameter(),
+ seed.getScanOrder());
final boolean[] addParent = {false};
sourceGroup.forEach(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index bfaf165b36..f2286dc21c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -76,11 +76,6 @@ public class AggregationNode extends MultiChildNode {
this.children = children;
}
- @Deprecated
- public AggregationNode(PlanNodeId id, List<AggregationDescriptor> aggregationDescriptorList) {
- this(id, aggregationDescriptorList, null, OrderBy.TIMESTAMP_ASC);
- }
-
public List<AggregationDescriptor> getAggregationDescriptorList() {
return aggregationDescriptorList;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
index 8afe5104b6..050597073b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
@@ -50,18 +50,6 @@ public class AlignedSeriesAggregationScanNode extends SeriesAggregationSourceNod
// The paths of the target series which will be aggregated.
private final AlignedPath alignedPath;
- // The order to traverse the data.
- // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
- // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
- private OrderBy scanOrder = OrderBy.TIMESTAMP_ASC;
-
- // time filter for current series, could be null if doesn't exist
- @Nullable private Filter timeFilter;
-
- // The parameter of `group by time`
- // Its value will be null if there is no `group by time` clause,
- @Nullable private GroupByTimeParameter groupByTimeParameter;
-
// The id of DataRegion where the node will run
private TRegionReplicaSet regionReplicaSet;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
index 6a77e02ae0..77b0eff6d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
@@ -64,18 +64,6 @@ public class SeriesAggregationScanNode extends SeriesAggregationSourceNode {
// The path of the target series which will be aggregated.
private final MeasurementPath seriesPath;
- // The order to traverse the data.
- // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
- // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
- private OrderBy scanOrder = OrderBy.TIMESTAMP_ASC;
-
- // time filter for current series, could be null if doesn't exist
- @Nullable private Filter timeFilter;
-
- // The parameter of `group by time`
- // Its value will be null if there is no `group by time` clause,
- @Nullable private GroupByTimeParameter groupByTimeParameter;
-
// The id of DataRegion where the node will run
private TRegionReplicaSet regionReplicaSet;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
index f9e1200ea6..d0087b2303 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
@@ -21,6 +21,11 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import javax.annotation.Nullable;
import java.util.List;
@@ -30,6 +35,18 @@ public abstract class SeriesAggregationSourceNode extends SeriesSourceNode {
// result TsBlock
protected List<AggregationDescriptor> aggregationDescriptorList;
+ // The order to traverse the data.
+ // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
+ // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
+ protected OrderBy scanOrder = OrderBy.TIMESTAMP_ASC;
+
+ // time filter for current series, could be null if doesn't exist
+ @Nullable protected Filter timeFilter;
+
+ // The parameter of `group by time`
+ // Its value will be null if there is no `group by time` clause,
+ @Nullable protected GroupByTimeParameter groupByTimeParameter;
+
public SeriesAggregationSourceNode(
PlanNodeId id, List<AggregationDescriptor> aggregationDescriptorList) {
super(id);
@@ -43,4 +60,18 @@ public abstract class SeriesAggregationSourceNode extends SeriesSourceNode {
public void setAggregationDescriptorList(List<AggregationDescriptor> aggregationDescriptorList) {
this.aggregationDescriptorList = aggregationDescriptorList;
}
+
+ public OrderBy getScanOrder() {
+ return scanOrder;
+ }
+
+ @Nullable
+ public Filter getTimeFilter() {
+ return timeFilter;
+ }
+
+ @Nullable
+ public GroupByTimeParameter getGroupByTimeParameter() {
+ return groupByTimeParameter;
+ }
}