You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/19 08:08:44 UTC

[iotdb] branch xingtanzjr/agg_distribution_plan updated (82425d29ed -> 8da9a7b64c)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/agg_distribution_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from 82425d29ed add more verifications for tests
     new c7364bef46 remove useless println
     new 8da9a7b64c complete basic GroupByLevel distribution

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../db/mpp/plan/planner/DistributionPlanner.java   | 158 +++++++++++++++++----
 .../plan/parameter/AggregationDescriptor.java      |   6 +-
 .../db/mpp/plan/plan/DistributionPlannerTest.java  |   5 -
 3 files changed, 135 insertions(+), 34 deletions(-)


[iotdb] 02/02: complete basic GroupByLevel distribution

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/agg_distribution_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8da9a7b64c79646b534890a4b8172bcc594e8f3a
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu May 19 16:08:36 2022 +0800

    complete basic GroupByLevel distribution
---
 .../db/mpp/plan/planner/DistributionPlanner.java   | 158 +++++++++++++++++----
 .../plan/parameter/AggregationDescriptor.java      |   6 +-
 .../db/mpp/plan/plan/DistributionPlannerTest.java  |   1 -
 3 files changed, 135 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
index 26ac5e456e..12d3bc8493 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.plan.planner;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
@@ -42,6 +43,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryM
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
@@ -55,6 +57,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -413,10 +417,12 @@ public class DistributionPlanner {
     private PlanNode planAggregationWithTimeJoin(
         TimeJoinNode root, DistributionPlanContext context) {
 
-      // Step 1: construct AggregationDescriptor for AggregationNode
+      List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context);
+      Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
+          sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+
+      // construct AggregationDescriptor for AggregationNode
       List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
-      List<SeriesAggregationSourceNode> sources = new ArrayList<>();
-      Map<PartialPath, Integer> regionCountPerSeries = new HashMap<>();
       for (PlanNode child : root.getChildren()) {
         SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
         handle
@@ -429,32 +435,7 @@ public class DistributionPlanner {
                           AggregationStep.FINAL,
                           descriptor.getInputExpressions()));
                 });
-        List<TRegionReplicaSet> dataDistribution =
-            analysis.getPartitionInfo(handle.getPartitionPath(), handle.getPartitionTimeFilter());
-        for (TRegionReplicaSet dataRegion : dataDistribution) {
-          SeriesAggregationSourceNode split = (SeriesAggregationSourceNode) handle.clone();
-          split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-          split.setRegionReplicaSet(dataRegion);
-          // Let each split reference different object of AggregationDescriptorList
-          split.setAggregationDescriptorList(
-              handle.getAggregationDescriptorList().stream()
-                  .map(AggregationDescriptor::deepClone)
-                  .collect(Collectors.toList()));
-          sources.add(split);
-        }
-        regionCountPerSeries.put(handle.getPartitionPath(), dataDistribution.size());
-      }
-
-      // Step 2: change the step for each SeriesAggregationSourceNode according to its split count
-      for (SeriesAggregationSourceNode node : sources) {
-        boolean isFinal = regionCountPerSeries.get(node.getPartitionPath()) == 1;
-        node.getAggregationDescriptorList()
-            .forEach(d -> d.setStep(isFinal ? AggregationStep.FINAL : AggregationStep.PARTIAL));
       }
-
-      Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
-          sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
-
       AggregationNode aggregationNode =
           new AggregationNode(
               context.queryContext.getQueryId().genPlanNodeId(), rootAggDescriptorList);
@@ -482,6 +463,127 @@ public class DistributionPlanner {
       return aggregationNode;
     }
 
+    public PlanNode visitGroupByLevel(GroupByLevelNode root, DistributionPlanContext context) {
+      // Firstly, we build the tree structure for GroupByLevelNode
+      List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context);
+      Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
+          sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+
+      GroupByLevelNode newRoot = (GroupByLevelNode) root.clone();
+      final boolean[] addParent = {false};
+      sourceGroup.forEach(
+          (dataRegion, sourceNodes) -> {
+            if (sourceNodes.size() == 1) {
+              newRoot.addChild(sourceNodes.get(0));
+            } else {
+              if (!addParent[0]) {
+                sourceNodes.forEach(newRoot::addChild);
+                addParent[0] = true;
+              } else {
+                // We clone a TimeJoinNode from root to make the params to be consistent.
+                // But we need to assign a new ID to it
+                GroupByLevelNode parentOfGroup = (GroupByLevelNode) root.clone();
+                parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+                sourceNodes.forEach(parentOfGroup::addChild);
+                newRoot.addChild(parentOfGroup);
+              }
+            }
+          });
+
+      // Then, we calculate the attributes for GroupByLevelNode in each level
+      calculateGroupByLevelNodeAttributes(newRoot, 0);
+      return null;
+    }
+
+    private void calculateGroupByLevelNodeAttributes(PlanNode node, int level) {
+      if (node == null) {
+        return;
+      }
+      node.getChildren().forEach(child -> calculateGroupByLevelNodeAttributes(child, level + 1));
+      if (!(node instanceof GroupByLevelNode)) {
+        return;
+      }
+      GroupByLevelNode handle = (GroupByLevelNode) node;
+
+      // Construct all outputColumns from children
+      List<String> childrenOutputColumns = new ArrayList<>();
+      handle
+          .getChildren()
+          .forEach(child -> childrenOutputColumns.addAll(child.getOutputColumnNames()));
+
+      // Check every OutputColumn of GroupByLevelNode and set the Expression of corresponding
+      // AggregationDescriptor
+      List<String> outputColumnList = new ArrayList<>();
+      List<AggregationDescriptor> descriptorList = new ArrayList<>();
+      for (int i = 0; i < handle.getOutputColumnNames().size(); i++) {
+        String column = handle.getOutputColumnNames().get(i);
+        Set<Expression> originalExpressions =
+            analysis.getAggregationExpressions().getOrDefault(column, new HashSet<>());
+        List<Expression> descriptorExpression = new ArrayList<>();
+        for (String childColumn : childrenOutputColumns) {
+          if (childColumn.equals(column)) {
+            try {
+              descriptorExpression.add(new TimeSeriesOperand(new PartialPath(childColumn)));
+            } catch (IllegalPathException e) {
+              throw new RuntimeException("error when plan distribution aggregation query", e);
+            }
+            continue;
+          }
+          for (Expression exp : originalExpressions) {
+            if (exp.getExpressionString().equals(childColumn)) {
+              descriptorExpression.add(exp);
+            }
+          }
+        }
+        if (descriptorExpression.size() == 0) {
+          continue;
+        }
+        AggregationDescriptor descriptor = handle.getAggregationDescriptorList().get(i).deepClone();
+        descriptor.setStep(level == 0 ? AggregationStep.FINAL : AggregationStep.PARTIAL);
+        descriptor.setInputExpressions(descriptorExpression);
+
+        outputColumnList.add(column);
+        descriptorList.add(descriptor);
+      }
+      handle.getOutputColumnNames().clear();
+      handle.getOutputColumnNames().addAll(outputColumnList);
+      handle.getAggregationDescriptorList().clear();
+      handle.getAggregationDescriptorList().addAll(descriptorList);
+    }
+
+    private List<SeriesAggregationSourceNode> splitAggregationSourceByPartition(
+        MultiChildNode root, DistributionPlanContext context) {
+      // Step 1: split SeriesAggregationSourceNode according to data partition
+      List<SeriesAggregationSourceNode> sources = new ArrayList<>();
+      Map<PartialPath, Integer> regionCountPerSeries = new HashMap<>();
+      for (PlanNode child : root.getChildren()) {
+        SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
+        List<TRegionReplicaSet> dataDistribution =
+            analysis.getPartitionInfo(handle.getPartitionPath(), handle.getPartitionTimeFilter());
+        for (TRegionReplicaSet dataRegion : dataDistribution) {
+          SeriesAggregationSourceNode split = (SeriesAggregationSourceNode) handle.clone();
+          split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+          split.setRegionReplicaSet(dataRegion);
+          // Let each split reference different object of AggregationDescriptorList
+          split.setAggregationDescriptorList(
+              handle.getAggregationDescriptorList().stream()
+                  .map(AggregationDescriptor::deepClone)
+                  .collect(Collectors.toList()));
+          sources.add(split);
+        }
+        regionCountPerSeries.put(handle.getPartitionPath(), dataDistribution.size());
+      }
+
+      // Step 2: change the step for each SeriesAggregationSourceNode according to its split count
+      for (SeriesAggregationSourceNode source : sources) {
+        boolean isFinal = regionCountPerSeries.get(source.getPartitionPath()) == 1;
+        source
+            .getAggregationDescriptorList()
+            .forEach(d -> d.setStep(isFinal ? AggregationStep.FINAL : AggregationStep.PARTIAL));
+      }
+      return sources;
+    }
+
     public PlanNode visit(PlanNode node, DistributionPlanContext context) {
       return node.accept(this, context);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
index c6933fd19d..f1667f7657 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -42,7 +42,7 @@ public class AggregationDescriptor {
    *
    * <p>example: select sum(s1) from root.sg.d1; expression [root.sg.d1.s1] will be in this field.
    */
-  private final List<Expression> inputExpressions;
+  private List<Expression> inputExpressions;
 
   private String parametersString;
 
@@ -122,6 +122,10 @@ public class AggregationDescriptor {
     this.step = step;
   }
 
+  public void setInputExpressions(List<Expression> inputExpressions) {
+    this.inputExpressions = inputExpressions;
+  }
+
   public AggregationDescriptor deepClone() {
     return new AggregationDescriptor(
         this.getAggregationType(), this.step, this.getInputExpressions());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
index bc88569410..a1025fe686 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
@@ -44,7 +44,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;


[iotdb] 01/02: remove useless println

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/agg_distribution_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c7364bef4676b0f166c86d8871fe2b0598cc4eba
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu May 19 12:15:48 2022 +0800

    remove useless println
---
 .../org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java    | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
index 606e9b09f0..bc88569410 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
@@ -93,7 +93,6 @@ public class DistributionPlannerTest {
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     DistributedQueryPlan plan = planner.planFragments();
-    plan.getInstances().forEach(System.out::println);
     assertEquals(2, plan.getInstances().size());
   }
 
@@ -191,7 +190,6 @@ public class DistributionPlannerTest {
         new DistributionPlanner(
             analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root2));
     PlanNode newRoot2 = planner2.rewriteSource();
-    System.out.println(PlanNodeUtil.nodeToString(newRoot2));
     assertEquals(newRoot2.getChildren().get(0).getChildren().size(), 2);
   }
 
@@ -521,7 +519,6 @@ public class DistributionPlannerTest {
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
     DistributedQueryPlan plan = planner.planFragments();
-    plan.getInstances().forEach(System.out::println);
     assertEquals(1, plan.getInstances().size());
   }
 
@@ -570,7 +567,6 @@ public class DistributionPlannerTest {
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, node));
     DistributedQueryPlan plan = planner.planFragments();
-    plan.getInstances().forEach(System.out::println);
     assertEquals(1, plan.getInstances().size());
   }