You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/18 09:49:55 UTC

[iotdb] branch master updated: Add distribution plan logic for AlignedSeriesScan (#5941)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f78e90fdbd Add distribution plan logic for AlignedSeriesScan (#5941)
f78e90fdbd is described below

commit f78e90fdbdaaaae1ee5262955ccc861b91f8a785
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Wed May 18 17:49:49 2022 +0800

    Add distribution plan logic for AlignedSeriesScan (#5941)
---
 .../db/mpp/plan/planner/DistributionPlanner.java   | 53 ++++++++++++++++++----
 .../plan/node/source/AlignedSeriesScanNode.java    | 12 ++++-
 2 files changed, 54 insertions(+), 11 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
index f0eb1fb387..ef0a8b96cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
@@ -43,8 +43,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
@@ -234,16 +236,10 @@ public class DistributionPlanner {
       return timeJoinNode;
     }
 
-    @Override
     public PlanNode visitSeriesAggregationScan(
         SeriesAggregationScanNode node, DistributionPlanContext context) {
       List<TRegionReplicaSet> dataDistribution =
           analysis.getPartitionInfo(node.getSeriesPath(), node.getTimeFilter());
-      if (dataDistribution.size() == 1) {
-        node.setRegionReplicaSet(dataDistribution.get(0));
-        return node;
-      }
-
       List<AggregationDescriptor> leafAggDescriptorList = new ArrayList<>();
       node.getAggregationDescriptorList()
           .forEach(
@@ -279,6 +275,26 @@ public class DistributionPlanner {
       return aggregationNode;
     }
 
+    @Override
+    public PlanNode visitAlignedSeriesScan(
+        AlignedSeriesScanNode node, DistributionPlanContext context) {
+      List<TRegionReplicaSet> dataDistribution =
+          analysis.getPartitionInfo(node.getAlignedPath(), node.getTimeFilter());
+      if (dataDistribution.size() == 1) {
+        node.setRegionReplicaSet(dataDistribution.get(0));
+        return node;
+      }
+      TimeJoinNode timeJoinNode =
+          new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
+      for (TRegionReplicaSet dataRegion : dataDistribution) {
+        AlignedSeriesScanNode split = (AlignedSeriesScanNode) node.clone();
+        split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+        split.setRegionReplicaSet(dataRegion);
+        timeJoinNode.addChild(split);
+      }
+      return timeJoinNode;
+    }
+
     @Override
     public PlanNode visitSchemaFetchMerge(
         SchemaFetchMergeNode node, DistributionPlanContext context) {
@@ -314,7 +330,7 @@ public class DistributionPlanner {
 
       // Step 1: Get all source nodes. For the node which is not source, add it as the child of
       // current TimeJoinNode
-      List<SeriesScanNode> sources = new ArrayList<>();
+      List<SourceNode> sources = new ArrayList<>();
       for (PlanNode child : node.getChildren()) {
         if (child instanceof SeriesScanNode) {
           // If the child is SeriesScanNode, we need to check whether this node should be seperated
@@ -330,6 +346,18 @@ public class DistributionPlanner {
             split.setRegionReplicaSet(dataRegion);
             sources.add(split);
           }
+        } else if (child instanceof AlignedSeriesScanNode) {
+          AlignedSeriesScanNode handle = (AlignedSeriesScanNode) child;
+          List<TRegionReplicaSet> dataDistribution =
+              analysis.getPartitionInfo(handle.getAlignedPath(), handle.getTimeFilter());
+          // If the size of dataDistribution is m, this SeriesScanNode should be seperated into m
+          // SeriesScanNode.
+          for (TRegionReplicaSet dataRegion : dataDistribution) {
+            AlignedSeriesScanNode split = (AlignedSeriesScanNode) handle.clone();
+            split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+            split.setRegionReplicaSet(dataRegion);
+            sources.add(split);
+          }
         } else if (child instanceof SeriesAggregationScanNode) {
           // TODO: (xingtanzjr) We should do the same thing for SeriesAggregateScanNode. Consider to
           // make SeriesAggregateScanNode
@@ -344,8 +372,8 @@ public class DistributionPlanner {
       }
 
       // Step 2: For the source nodes, group them by the DataRegion.
-      Map<TRegionReplicaSet, List<SeriesScanNode>> sourceGroup =
-          sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getRegionReplicaSet));
+      Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
+          sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
       // Step 3: For the source nodes which belong to same data region, add a TimeJoinNode for them
       // and make the
       // new TimeJoinNode as the child of current TimeJoinNode
@@ -481,6 +509,13 @@ public class DistributionPlanner {
     }
 
     @Override
+    public PlanNode visitAlignedSeriesScan(AlignedSeriesScanNode node, NodeGroupContext context) {
+      context.putNodeDistribution(
+          node.getPlanNodeId(),
+          new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
+      return node.clone();
+    }
+
     public PlanNode visitSeriesAggregationScan(
         SeriesAggregationScanNode node, NodeGroupContext context) {
       context.putNodeDistribution(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
index 749c653e0b..72c2a24244 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
@@ -129,11 +129,13 @@ public class AlignedSeriesScanNode extends SourceNode {
 
   @Override
   public TRegionReplicaSet getRegionReplicaSet() {
-    return null;
+    return regionReplicaSet;
   }
 
   @Override
-  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {}
+  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
+  }
 
   @Override
   public void close() throws Exception {}
@@ -265,4 +267,10 @@ public class AlignedSeriesScanNode extends SourceNode {
         offset,
         regionReplicaSet);
   }
+
+  public String toString() {
+    return String.format(
+        "AlignedSeriesScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
+        this.getPlanNodeId(), this.getAlignedPath(), this.getRegionReplicaSet());
+  }
 }