You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/18 09:49:55 UTC
[iotdb] branch master updated: Add distribution plan logic for AlignedSeriesScan (#5941)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f78e90fdbd Add distribution plan logic for AlignedSeriesScan (#5941)
f78e90fdbd is described below
commit f78e90fdbdaaaae1ee5262955ccc861b91f8a785
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Wed May 18 17:49:49 2022 +0800
Add distribution plan logic for AlignedSeriesScan (#5941)
---
.../db/mpp/plan/planner/DistributionPlanner.java | 53 ++++++++++++++++++----
.../plan/node/source/AlignedSeriesScanNode.java | 12 ++++-
2 files changed, 54 insertions(+), 11 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
index f0eb1fb387..ef0a8b96cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
@@ -43,8 +43,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
@@ -234,16 +236,10 @@ public class DistributionPlanner {
return timeJoinNode;
}
- @Override
public PlanNode visitSeriesAggregationScan(
SeriesAggregationScanNode node, DistributionPlanContext context) {
List<TRegionReplicaSet> dataDistribution =
analysis.getPartitionInfo(node.getSeriesPath(), node.getTimeFilter());
- if (dataDistribution.size() == 1) {
- node.setRegionReplicaSet(dataDistribution.get(0));
- return node;
- }
-
List<AggregationDescriptor> leafAggDescriptorList = new ArrayList<>();
node.getAggregationDescriptorList()
.forEach(
@@ -279,6 +275,26 @@ public class DistributionPlanner {
return aggregationNode;
}
+ @Override
+ public PlanNode visitAlignedSeriesScan(
+ AlignedSeriesScanNode node, DistributionPlanContext context) {
+ List<TRegionReplicaSet> dataDistribution =
+ analysis.getPartitionInfo(node.getAlignedPath(), node.getTimeFilter());
+ if (dataDistribution.size() == 1) {
+ node.setRegionReplicaSet(dataDistribution.get(0));
+ return node;
+ }
+ TimeJoinNode timeJoinNode =
+ new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
+ for (TRegionReplicaSet dataRegion : dataDistribution) {
+ AlignedSeriesScanNode split = (AlignedSeriesScanNode) node.clone();
+ split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ split.setRegionReplicaSet(dataRegion);
+ timeJoinNode.addChild(split);
+ }
+ return timeJoinNode;
+ }
+
@Override
public PlanNode visitSchemaFetchMerge(
SchemaFetchMergeNode node, DistributionPlanContext context) {
@@ -314,7 +330,7 @@ public class DistributionPlanner {
// Step 1: Get all source nodes. For the node which is not source, add it as the child of
// current TimeJoinNode
- List<SeriesScanNode> sources = new ArrayList<>();
+ List<SourceNode> sources = new ArrayList<>();
for (PlanNode child : node.getChildren()) {
if (child instanceof SeriesScanNode) {
// If the child is SeriesScanNode, we need to check whether this node should be seperated
@@ -330,6 +346,18 @@ public class DistributionPlanner {
split.setRegionReplicaSet(dataRegion);
sources.add(split);
}
+ } else if (child instanceof AlignedSeriesScanNode) {
+ AlignedSeriesScanNode handle = (AlignedSeriesScanNode) child;
+ List<TRegionReplicaSet> dataDistribution =
+ analysis.getPartitionInfo(handle.getAlignedPath(), handle.getTimeFilter());
+ // If the size of dataDistribution is m, this SeriesScanNode should be seperated into m
+ // SeriesScanNode.
+ for (TRegionReplicaSet dataRegion : dataDistribution) {
+ AlignedSeriesScanNode split = (AlignedSeriesScanNode) handle.clone();
+ split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ split.setRegionReplicaSet(dataRegion);
+ sources.add(split);
+ }
} else if (child instanceof SeriesAggregationScanNode) {
// TODO: (xingtanzjr) We should do the same thing for SeriesAggregateScanNode. Consider to
// make SeriesAggregateScanNode
@@ -344,8 +372,8 @@ public class DistributionPlanner {
}
// Step 2: For the source nodes, group them by the DataRegion.
- Map<TRegionReplicaSet, List<SeriesScanNode>> sourceGroup =
- sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getRegionReplicaSet));
+ Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
+ sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
// Step 3: For the source nodes which belong to same data region, add a TimeJoinNode for them
// and make the
// new TimeJoinNode as the child of current TimeJoinNode
@@ -481,6 +509,13 @@ public class DistributionPlanner {
}
@Override
+ public PlanNode visitAlignedSeriesScan(AlignedSeriesScanNode node, NodeGroupContext context) {
+ context.putNodeDistribution(
+ node.getPlanNodeId(),
+ new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
+ return node.clone();
+ }
+
public PlanNode visitSeriesAggregationScan(
SeriesAggregationScanNode node, NodeGroupContext context) {
context.putNodeDistribution(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
index 749c653e0b..72c2a24244 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
@@ -129,11 +129,13 @@ public class AlignedSeriesScanNode extends SourceNode {
@Override
public TRegionReplicaSet getRegionReplicaSet() {
- return null;
+ return regionReplicaSet;
}
@Override
- public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {}
+ public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+ this.regionReplicaSet = regionReplicaSet;
+ }
@Override
public void close() throws Exception {}
@@ -265,4 +267,10 @@ public class AlignedSeriesScanNode extends SourceNode {
offset,
regionReplicaSet);
}
+
+ public String toString() {
+ return String.format(
+ "AlignedSeriesScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
+ this.getPlanNodeId(), this.getAlignedPath(), this.getRegionReplicaSet());
+ }
}