You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/11 12:01:02 UTC
[iotdb] 12/13: implement createSubNode() for consumeAllNode
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1adc1a9bbc00eaf018e01faacb8f75ae959ba7b5
Author: Alima777 <wx...@gmail.com>
AuthorDate: Sat Feb 11 19:24:38 2023 +0800
implement createSubNode() for consumeAllNode
---
.../plan/planner/distribution/SourceRewriter.java | 16 ++------------
.../db/mpp/plan/planner/plan/node/PlanNode.java | 2 +-
.../planner/plan/node/process/AggregationNode.java | 25 ++++++++++++++--------
.../planner/plan/node/process/DeviceMergeNode.java | 7 ++----
.../plan/node/process/GroupByLevelNode.java | 7 ++----
.../planner/plan/node/process/GroupByTagNode.java | 12 +++--------
.../planner/plan/node/process/MergeSortNode.java | 3 +--
7 files changed, 27 insertions(+), 45 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index bbc973ab5f..ad0fbfcc09 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -1060,7 +1060,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
private Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>>
splitAggregationSourceByPartition(PlanNode root, DistributionPlanContext context) {
// Step 0: get all SeriesAggregationSourceNode in PlanNodeTree
- List<SeriesAggregationSourceNode> rawSources = findAggregationSourceNode(root);
+ List<SeriesAggregationSourceNode> rawSources = AggregationNode.findAggregationSourceNode(root);
// Step 1: construct SeriesAggregationSourceNode for each data region of one Path
List<SeriesAggregationSourceNode> sources = new ArrayList<>();
@@ -1092,7 +1092,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
boolean[] eachSeriesOneRegion,
Map<PartialPath, Integer> regionCountPerSeries) {
// Step 0: get all SeriesAggregationSourceNode in PlanNodeTree
- List<SeriesAggregationSourceNode> rawSources = findAggregationSourceNode(root);
+ List<SeriesAggregationSourceNode> rawSources = AggregationNode.findAggregationSourceNode(root);
// Step 1: construct SeriesAggregationSourceNode for each data region of one Path
for (SeriesAggregationSourceNode child : rawSources) {
@@ -1144,18 +1144,6 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
return dataDistribution.size();
}
- private List<SeriesAggregationSourceNode> findAggregationSourceNode(PlanNode node) {
- if (node == null) {
- return new ArrayList<>();
- }
- if (node instanceof SeriesAggregationSourceNode) {
- return Collections.singletonList((SeriesAggregationSourceNode) node);
- }
- List<SeriesAggregationSourceNode> ret = new ArrayList<>();
- node.getChildren().forEach(child -> ret.addAll(findAggregationSourceNode(child)));
- return ret;
- }
-
public List<PlanNode> visit(PlanNode node, DistributionPlanContext context) {
return node.accept(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
index ae66ddc843..280ebe48e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
@@ -75,7 +75,7 @@ public abstract class PlanNode implements IConsensusRequest {
*/
public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
throw new UnsupportedOperationException(
- String.format("Can't Create subNode for %s", this.getClass().toString()));
+ String.format("Can't create subNode for %s", this.getClass().toString()));
}
public PlanNode cloneWithChildren(List<PlanNode> children) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index 69b8eb02db..479fa24996 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -36,6 +37,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -167,16 +169,9 @@ public class AggregationNode extends MultiChildProcessNode {
}
public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
- return new AggregationNode(
+ return new HorizontallyConcatNode(
new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
- children.subList(startIndex, endIndex),
- // TODO figure out the relation of aggregationDescriptorList and children node
- getAggregationDescriptorList().subList(startIndex, endIndex),
- getGroupByTimeParameter(),
- getGroupByParameter(),
- getGroupByExpression(),
- outputEndTime,
- getScanOrder());
+ new ArrayList<>(children.subList(startIndex, endIndex)));
}
@Override
@@ -194,6 +189,18 @@ public class AggregationNode extends MultiChildProcessNode {
return outputColumnNames;
}
+ public static List<SeriesAggregationSourceNode> findAggregationSourceNode(PlanNode node) {
+ if (node == null) {
+ return new ArrayList<>();
+ }
+ if (node instanceof SeriesAggregationSourceNode) {
+ return Collections.singletonList((SeriesAggregationSourceNode) node);
+ }
+ List<SeriesAggregationSourceNode> ret = new ArrayList<>();
+ node.getChildren().forEach(child -> ret.addAll(findAggregationSourceNode(child)));
+ return ret;
+ }
+
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitAggregation(this, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
index c56d965c49..faa21947d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
@@ -65,11 +65,8 @@ public class DeviceMergeNode extends MultiChildProcessNode {
@Override
public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
- return new DeviceMergeNode(
- new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
- getMergeOrderParameter(),
- // TODO figure out the relation of devices and children node
- devices.subList(startIndex, endIndex));
+ throw new UnsupportedOperationException(
+ "DeviceMergeNode should have only one local child in single data region.");
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index 8959eae92b..f1833a7cee 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -95,12 +95,9 @@ public class GroupByLevelNode extends MultiChildProcessNode {
@Override
public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
- return new GroupByLevelNode(
+ return new HorizontallyConcatNode(
new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
- // TODO figure out the relation of aggregationDescriptorList and children node
- this.groupByLevelDescriptors.subList(startIndex, endIndex),
- this.groupByTimeParameter,
- this.scanOrder);
+ new ArrayList<>(children.subList(startIndex, endIndex)));
}
public List<CrossSeriesAggregationDescriptor> getGroupByLevelDescriptors() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
index 485aea5733..d0333e79a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
@@ -28,9 +28,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.commons.lang3.Validate;
-
import javax.annotation.Nullable;
+import org.apache.commons.lang3.Validate;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -101,14 +100,9 @@ public class GroupByTagNode extends MultiChildProcessNode {
@Override
public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
- return new GroupByTagNode(
+ return new HorizontallyConcatNode(
new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
- this.groupByTimeParameter,
- this.scanOrder,
- // TODO figure out the relation of aggregationDescriptorList and children node
- this.tagKeys,
- this.tagValuesToAggregationDescriptors,
- this.outputColumnNames);
+ new ArrayList<>(children.subList(startIndex, endIndex)));
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
index 6c69a70722..6fbd9e7ab4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
@@ -59,8 +59,7 @@ public class MergeSortNode extends MultiChildProcessNode {
return new MergeSortNode(
new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
getMergeOrderParameter(),
- // TODO figure out the relation of outputColumns and children node
- outputColumns.subList(startIndex, endIndex));
+ outputColumns);
}
@Override