You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/11 12:01:02 UTC

[iotdb] 12/13: implement createSubNode() for consumeAllNode

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1adc1a9bbc00eaf018e01faacb8f75ae959ba7b5
Author: Alima777 <wx...@gmail.com>
AuthorDate: Sat Feb 11 19:24:38 2023 +0800

    implement createSubNode() for consumeAllNode
---
 .../plan/planner/distribution/SourceRewriter.java  | 16 ++------------
 .../db/mpp/plan/planner/plan/node/PlanNode.java    |  2 +-
 .../planner/plan/node/process/AggregationNode.java | 25 ++++++++++++++--------
 .../planner/plan/node/process/DeviceMergeNode.java |  7 ++----
 .../plan/node/process/GroupByLevelNode.java        |  7 ++----
 .../planner/plan/node/process/GroupByTagNode.java  | 12 +++--------
 .../planner/plan/node/process/MergeSortNode.java   |  3 +--
 7 files changed, 27 insertions(+), 45 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index bbc973ab5f..ad0fbfcc09 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -1060,7 +1060,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
   private Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>>
       splitAggregationSourceByPartition(PlanNode root, DistributionPlanContext context) {
     // Step 0: get all SeriesAggregationSourceNode in PlanNodeTree
-    List<SeriesAggregationSourceNode> rawSources = findAggregationSourceNode(root);
+    List<SeriesAggregationSourceNode> rawSources = AggregationNode.findAggregationSourceNode(root);
 
     // Step 1: construct SeriesAggregationSourceNode for each data region of one Path
     List<SeriesAggregationSourceNode> sources = new ArrayList<>();
@@ -1092,7 +1092,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
           boolean[] eachSeriesOneRegion,
           Map<PartialPath, Integer> regionCountPerSeries) {
     // Step 0: get all SeriesAggregationSourceNode in PlanNodeTree
-    List<SeriesAggregationSourceNode> rawSources = findAggregationSourceNode(root);
+    List<SeriesAggregationSourceNode> rawSources = AggregationNode.findAggregationSourceNode(root);
 
     // Step 1: construct SeriesAggregationSourceNode for each data region of one Path
     for (SeriesAggregationSourceNode child : rawSources) {
@@ -1144,18 +1144,6 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     return dataDistribution.size();
   }
 
-  private List<SeriesAggregationSourceNode> findAggregationSourceNode(PlanNode node) {
-    if (node == null) {
-      return new ArrayList<>();
-    }
-    if (node instanceof SeriesAggregationSourceNode) {
-      return Collections.singletonList((SeriesAggregationSourceNode) node);
-    }
-    List<SeriesAggregationSourceNode> ret = new ArrayList<>();
-    node.getChildren().forEach(child -> ret.addAll(findAggregationSourceNode(child)));
-    return ret;
-  }
-
   public List<PlanNode> visit(PlanNode node, DistributionPlanContext context) {
     return node.accept(this, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
index ae66ddc843..280ebe48e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
@@ -75,7 +75,7 @@ public abstract class PlanNode implements IConsensusRequest {
    */
   public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
     throw new UnsupportedOperationException(
-        String.format("Can't Create subNode for %s", this.getClass().toString()));
+        String.format("Can't create subNode for %s", this.getClass().toString()));
   }
 
   public PlanNode cloneWithChildren(List<PlanNode> children) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index 69b8eb02db..479fa24996 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -36,6 +37,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -167,16 +169,9 @@ public class AggregationNode extends MultiChildProcessNode {
   }
 
   public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
-    return new AggregationNode(
+    return new HorizontallyConcatNode(
         new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
-        children.subList(startIndex, endIndex),
-        // TODO figure out the relation of aggregationDescriptorList and children node
-        getAggregationDescriptorList().subList(startIndex, endIndex),
-        getGroupByTimeParameter(),
-        getGroupByParameter(),
-        getGroupByExpression(),
-        outputEndTime,
-        getScanOrder());
+        new ArrayList<>(children.subList(startIndex, endIndex)));
   }
 
   @Override
@@ -194,6 +189,18 @@ public class AggregationNode extends MultiChildProcessNode {
     return outputColumnNames;
   }
 
+  public static List<SeriesAggregationSourceNode> findAggregationSourceNode(PlanNode node) {
+    if (node == null) {
+      return new ArrayList<>();
+    }
+    if (node instanceof SeriesAggregationSourceNode) {
+      return Collections.singletonList((SeriesAggregationSourceNode) node);
+    }
+    List<SeriesAggregationSourceNode> ret = new ArrayList<>();
+    node.getChildren().forEach(child -> ret.addAll(findAggregationSourceNode(child)));
+    return ret;
+  }
+
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitAggregation(this, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
index c56d965c49..faa21947d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
@@ -65,11 +65,8 @@ public class DeviceMergeNode extends MultiChildProcessNode {
 
   @Override
   public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
-    return new DeviceMergeNode(
-        new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
-        getMergeOrderParameter(),
-        // TODO figure out the relation of devices and children node
-        devices.subList(startIndex, endIndex));
+    throw new UnsupportedOperationException(
+        "DeviceMergeNode should have only one local child in single data region.");
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index 8959eae92b..f1833a7cee 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -95,12 +95,9 @@ public class GroupByLevelNode extends MultiChildProcessNode {
 
   @Override
   public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
-    return new GroupByLevelNode(
+    return new HorizontallyConcatNode(
         new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
-        // TODO figure out the relation of aggregationDescriptorList and children node
-        this.groupByLevelDescriptors.subList(startIndex, endIndex),
-        this.groupByTimeParameter,
-        this.scanOrder);
+        new ArrayList<>(children.subList(startIndex, endIndex)));
   }
 
   public List<CrossSeriesAggregationDescriptor> getGroupByLevelDescriptors() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
index 485aea5733..d0333e79a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
@@ -28,9 +28,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import org.apache.commons.lang3.Validate;
-
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.Validate;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -101,14 +100,9 @@ public class GroupByTagNode extends MultiChildProcessNode {
 
   @Override
   public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
-    return new GroupByTagNode(
+    return new HorizontallyConcatNode(
         new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
-        this.groupByTimeParameter,
-        this.scanOrder,
-        // TODO figure out the relation of aggregationDescriptorList and children node
-        this.tagKeys,
-        this.tagValuesToAggregationDescriptors,
-        this.outputColumnNames);
+        new ArrayList<>(children.subList(startIndex, endIndex)));
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
index 6c69a70722..6fbd9e7ab4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
@@ -59,8 +59,7 @@ public class MergeSortNode extends MultiChildProcessNode {
     return new MergeSortNode(
         new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
         getMergeOrderParameter(),
-        // TODO figure out the relation of outputColumns and children node
-        outputColumns.subList(startIndex, endIndex));
+        outputColumns);
   }
 
   @Override