You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/16 07:58:03 UTC

[iotdb] 01/01: complete the competibility for AlignedSeriesScan in DistributionPlanner

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/distribution_plan_0516
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 083e7240538755b8704564a7dafcc3a9877eb06c
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon May 16 15:57:50 2022 +0800

    complete the competibility for AlignedSeriesScan in DistributionPlanner
---
 .../db/mpp/plan/planner/DistributionPlanner.java   | 52 ++++++++++++++++++++--
 .../plan/node/source/AlignedSeriesScanNode.java    | 12 ++++-
 2 files changed, 59 insertions(+), 5 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
index 775601c0b6..5e002ac83b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.SimplePlanNodeRewriter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
@@ -42,8 +43,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryS
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 
 import java.util.ArrayList;
@@ -88,8 +91,11 @@ public class DistributionPlanner {
   }
 
   public DistributedQueryPlan planFragments() {
+    PlanNodeUtil.printPlanNode(this.logicalPlan.getRootNode());
     PlanNode rootAfterRewrite = rewriteSource();
+    PlanNodeUtil.printPlanNode(rootAfterRewrite);
     PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
+    PlanNodeUtil.printPlanNode(rootWithExchange);
     if (analysis.getStatement() instanceof QueryStatement) {
       analysis
           .getRespDatasetHeader()
@@ -231,6 +237,26 @@ public class DistributionPlanner {
       return timeJoinNode;
     }
 
+    @Override
+    public PlanNode visitAlignedSeriesScan(
+        AlignedSeriesScanNode node, DistributionPlanContext context) {
+      List<TRegionReplicaSet> dataDistribution =
+          analysis.getPartitionInfo(node.getAlignedPath(), node.getTimeFilter());
+      if (dataDistribution.size() == 1) {
+        node.setRegionReplicaSet(dataDistribution.get(0));
+        return node;
+      }
+      TimeJoinNode timeJoinNode =
+          new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
+      for (TRegionReplicaSet dataRegion : dataDistribution) {
+        AlignedSeriesScanNode split = (AlignedSeriesScanNode) node.clone();
+        split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+        split.setRegionReplicaSet(dataRegion);
+        timeJoinNode.addChild(split);
+      }
+      return timeJoinNode;
+    }
+
     @Override
     public PlanNode visitSchemaFetchMerge(
         SchemaFetchMergeNode node, DistributionPlanContext context) {
@@ -266,7 +292,7 @@ public class DistributionPlanner {
 
       // Step 1: Get all source nodes. For the node which is not source, add it as the child of
       // current TimeJoinNode
-      List<SeriesScanNode> sources = new ArrayList<>();
+      List<SourceNode> sources = new ArrayList<>();
       for (PlanNode child : node.getChildren()) {
         if (child instanceof SeriesScanNode) {
           // If the child is SeriesScanNode, we need to check whether this node should be seperated
@@ -282,6 +308,18 @@ public class DistributionPlanner {
             split.setRegionReplicaSet(dataRegion);
             sources.add(split);
           }
+        } else if (child instanceof AlignedSeriesScanNode) {
+          AlignedSeriesScanNode handle = (AlignedSeriesScanNode) child;
+          List<TRegionReplicaSet> dataDistribution =
+              analysis.getPartitionInfo(handle.getAlignedPath(), handle.getTimeFilter());
+          // If the size of dataDistribution is m, this SeriesScanNode should be seperated into m
+          // SeriesScanNode.
+          for (TRegionReplicaSet dataRegion : dataDistribution) {
+            AlignedSeriesScanNode split = (AlignedSeriesScanNode) handle.clone();
+            split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+            split.setRegionReplicaSet(dataRegion);
+            sources.add(split);
+          }
         } else if (child instanceof SeriesAggregationScanNode) {
           // TODO: (xingtanzjr) We should do the same thing for SeriesAggregateScanNode. Consider to
           // make SeriesAggregateScanNode
@@ -296,8 +334,8 @@ public class DistributionPlanner {
       }
 
       // Step 2: For the source nodes, group them by the DataRegion.
-      Map<TRegionReplicaSet, List<SeriesScanNode>> sourceGroup =
-          sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getRegionReplicaSet));
+      Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
+          sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
       // Step 3: For the source nodes which belong to same data region, add a TimeJoinNode for them
       // and make the
       // new TimeJoinNode as the child of current TimeJoinNode
@@ -432,6 +470,14 @@ public class DistributionPlanner {
       return node.clone();
     }
 
+    @Override
+    public PlanNode visitAlignedSeriesScan(AlignedSeriesScanNode node, NodeGroupContext context) {
+      context.putNodeDistribution(
+          node.getPlanNodeId(),
+          new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
+      return node.clone();
+    }
+
     @Override
     public PlanNode visitSeriesAggregate(SeriesAggregationScanNode node, NodeGroupContext context) {
       context.putNodeDistribution(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
index 749c653e0b..72c2a24244 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
@@ -129,11 +129,13 @@ public class AlignedSeriesScanNode extends SourceNode {
 
   @Override
   public TRegionReplicaSet getRegionReplicaSet() {
-    return null;
+    return regionReplicaSet;
   }
 
   @Override
-  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {}
+  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
+  }
 
   @Override
   public void close() throws Exception {}
@@ -265,4 +267,10 @@ public class AlignedSeriesScanNode extends SourceNode {
         offset,
         regionReplicaSet);
   }
+
+  public String toString() {
+    return String.format(
+        "AlignedSeriesScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
+        this.getPlanNodeId(), this.getAlignedPath(), this.getRegionReplicaSet());
+  }
 }