You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/16 07:58:03 UTC
[iotdb] 01/01: complete the competibility for AlignedSeriesScan in DistributionPlanner
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/distribution_plan_0516
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 083e7240538755b8704564a7dafcc3a9877eb06c
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon May 16 15:57:50 2022 +0800
complete the competibility for AlignedSeriesScan in DistributionPlanner
---
.../db/mpp/plan/planner/DistributionPlanner.java | 52 ++++++++++++++++++++--
.../plan/node/source/AlignedSeriesScanNode.java | 12 ++++-
2 files changed, 59 insertions(+), 5 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
index 775601c0b6..5e002ac83b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.SimplePlanNodeRewriter;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
@@ -42,8 +43,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryS
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import java.util.ArrayList;
@@ -88,8 +91,11 @@ public class DistributionPlanner {
}
public DistributedQueryPlan planFragments() {
+ PlanNodeUtil.printPlanNode(this.logicalPlan.getRootNode());
PlanNode rootAfterRewrite = rewriteSource();
+ PlanNodeUtil.printPlanNode(rootAfterRewrite);
PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
+ PlanNodeUtil.printPlanNode(rootWithExchange);
if (analysis.getStatement() instanceof QueryStatement) {
analysis
.getRespDatasetHeader()
@@ -231,6 +237,26 @@ public class DistributionPlanner {
return timeJoinNode;
}
+ @Override
+ public PlanNode visitAlignedSeriesScan(
+ AlignedSeriesScanNode node, DistributionPlanContext context) {
+ List<TRegionReplicaSet> dataDistribution =
+ analysis.getPartitionInfo(node.getAlignedPath(), node.getTimeFilter());
+ if (dataDistribution.size() == 1) {
+ node.setRegionReplicaSet(dataDistribution.get(0));
+ return node;
+ }
+ TimeJoinNode timeJoinNode =
+ new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
+ for (TRegionReplicaSet dataRegion : dataDistribution) {
+ AlignedSeriesScanNode split = (AlignedSeriesScanNode) node.clone();
+ split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ split.setRegionReplicaSet(dataRegion);
+ timeJoinNode.addChild(split);
+ }
+ return timeJoinNode;
+ }
+
@Override
public PlanNode visitSchemaFetchMerge(
SchemaFetchMergeNode node, DistributionPlanContext context) {
@@ -266,7 +292,7 @@ public class DistributionPlanner {
// Step 1: Get all source nodes. For the node which is not source, add it as the child of
// current TimeJoinNode
- List<SeriesScanNode> sources = new ArrayList<>();
+ List<SourceNode> sources = new ArrayList<>();
for (PlanNode child : node.getChildren()) {
if (child instanceof SeriesScanNode) {
// If the child is SeriesScanNode, we need to check whether this node should be seperated
@@ -282,6 +308,18 @@ public class DistributionPlanner {
split.setRegionReplicaSet(dataRegion);
sources.add(split);
}
+ } else if (child instanceof AlignedSeriesScanNode) {
+ AlignedSeriesScanNode handle = (AlignedSeriesScanNode) child;
+ List<TRegionReplicaSet> dataDistribution =
+ analysis.getPartitionInfo(handle.getAlignedPath(), handle.getTimeFilter());
+ // If the size of dataDistribution is m, this SeriesScanNode should be seperated into m
+ // SeriesScanNode.
+ for (TRegionReplicaSet dataRegion : dataDistribution) {
+ AlignedSeriesScanNode split = (AlignedSeriesScanNode) handle.clone();
+ split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ split.setRegionReplicaSet(dataRegion);
+ sources.add(split);
+ }
} else if (child instanceof SeriesAggregationScanNode) {
// TODO: (xingtanzjr) We should do the same thing for SeriesAggregateScanNode. Consider to
// make SeriesAggregateScanNode
@@ -296,8 +334,8 @@ public class DistributionPlanner {
}
// Step 2: For the source nodes, group them by the DataRegion.
- Map<TRegionReplicaSet, List<SeriesScanNode>> sourceGroup =
- sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getRegionReplicaSet));
+ Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
+ sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
// Step 3: For the source nodes which belong to same data region, add a TimeJoinNode for them
// and make the
// new TimeJoinNode as the child of current TimeJoinNode
@@ -432,6 +470,14 @@ public class DistributionPlanner {
return node.clone();
}
+ @Override
+ public PlanNode visitAlignedSeriesScan(AlignedSeriesScanNode node, NodeGroupContext context) {
+ context.putNodeDistribution(
+ node.getPlanNodeId(),
+ new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
+ return node.clone();
+ }
+
@Override
public PlanNode visitSeriesAggregate(SeriesAggregationScanNode node, NodeGroupContext context) {
context.putNodeDistribution(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
index 749c653e0b..72c2a24244 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
@@ -129,11 +129,13 @@ public class AlignedSeriesScanNode extends SourceNode {
@Override
public TRegionReplicaSet getRegionReplicaSet() {
- return null;
+ return regionReplicaSet;
}
@Override
- public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {}
+ public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+ this.regionReplicaSet = regionReplicaSet;
+ }
@Override
public void close() throws Exception {}
@@ -265,4 +267,10 @@ public class AlignedSeriesScanNode extends SourceNode {
offset,
regionReplicaSet);
}
+
+ public String toString() {
+ return String.format(
+ "AlignedSeriesScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
+ this.getPlanNodeId(), this.getAlignedPath(), this.getRegionReplicaSet());
+ }
}