You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/27 13:32:39 UTC
[iotdb] 06/07: fix the bug in Align by device
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/align_by_device_distribution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 992fa36d7faac7794f8a7f0929c5f1bf0be0648b
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri May 27 21:12:28 2022 +0800
fix the bug in Align by device
---
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 49 +++++++++++++++++++++-
.../plan/planner/distribution/SourceRewriter.java | 4 +-
.../planner/plan/node/process/DeviceMergeNode.java | 7 +++-
.../planner/plan/node/process/DeviceViewNode.java | 8 ++--
.../plan/node/process/GroupByLevelNode.java | 9 ++++
5 files changed, 69 insertions(+), 8 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 6587e05306..08040f0d06 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -143,6 +143,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSc
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn;
import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
@@ -785,7 +786,53 @@ public class LocalExecutionPlanner {
@Override
public Operator visitGroupByLevel(GroupByLevelNode node, LocalExecutionPlanContext context) {
- return super.visitGroupByLevel(node, context);
+ checkArgument(
+ node.getGroupByLevelDescriptors().size() >= 1,
+ "GroupByLevel descriptorList cannot be empty");
+ List<Operator> children =
+ node.getChildren().stream()
+ .map(child -> child.accept(this, context))
+ .collect(Collectors.toList());
+ boolean ascending = node.getScanOrder() == OrderBy.TIMESTAMP_ASC;
+ List<Aggregator> aggregators = new ArrayList<>();
+ Map<String, List<InputLocation>> layout = makeLayout(node);
+ for (GroupByLevelDescriptor descriptor : node.getGroupByLevelDescriptors()) {
+ List<String> inputColumnNames = descriptor.getInputColumnNames();
+ // it may include double parts
+ List<List<InputLocation>> inputLocationParts = new ArrayList<>(inputColumnNames.size());
+ inputColumnNames.forEach(o -> inputLocationParts.add(layout.get(o)));
+
+ List<InputLocation[]> inputLocationList = new ArrayList<>();
+ for (int i = 0; i < inputLocationParts.get(0).size(); i++) {
+ if (inputColumnNames.size() == 1) {
+ inputLocationList.add(new InputLocation[] {inputLocationParts.get(0).get(i)});
+ } else {
+ inputLocationList.add(
+ new InputLocation[] {
+ inputLocationParts.get(0).get(i), inputLocationParts.get(1).get(i)
+ });
+ }
+ }
+
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ descriptor.getAggregationType(),
+ context
+ .getTypeProvider()
+ // get the type of first inputExpression
+ .getType(descriptor.getInputExpressions().get(0).toString()),
+ ascending),
+ descriptor.getStep(),
+ inputLocationList));
+ }
+ OperatorContext operatorContext =
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ AggregationOperator.class.getSimpleName());
+ return new AggregationOperator(
+ operatorContext, aggregators, children, ascending, node.getGroupByTimeParameter());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 014aec6443..64a348ad76 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -153,11 +153,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
private PlanNode buildPlanNodeInRegion(
PlanNode root, TRegionReplicaSet regionReplicaSet, MPPQueryContext context) {
List<PlanNode> children =
- root.getChildren().stream().map(PlanNodeUtil::deepCopy).collect(Collectors.toList());
+ root.getChildren().stream().map(child -> buildPlanNodeInRegion(child, regionReplicaSet, context)).collect(Collectors.toList());
PlanNode newRoot = root.cloneWithChildren(children);
newRoot.setPlanNodeId(context.getQueryId().genPlanNodeId());
if (newRoot instanceof SourceNode) {
- ((SourceNode) root).setRegionReplicaSet(regionReplicaSet);
+ ((SourceNode) newRoot).setRegionReplicaSet(regionReplicaSet);
}
return newRoot;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
index d8456f4968..f10b69a474 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
@@ -41,8 +41,6 @@ public class DeviceMergeNode extends MultiChildNode {
// the list of selected devices
private final List<String> devices;
- private final List<PlanNode> children;
-
public DeviceMergeNode(
PlanNodeId id, List<PlanNode> children, List<OrderBy> mergeOrders, List<String> devices) {
super(id);
@@ -146,4 +144,9 @@ public class DeviceMergeNode extends MultiChildNode {
public int hashCode() {
return Objects.hash(super.hashCode(), mergeOrders, devices, children);
}
+
+ @Override
+ public String toString() {
+ return "DeviceMerge-" + this.getPlanNodeId();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
index 3d6dfcb8f1..a70252c55d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
@@ -49,9 +49,6 @@ public class DeviceViewNode extends MultiChildNode {
// The size devices and children should be the same.
private final List<String> devices = new ArrayList<>();
- // each child node whose output TsBlock contains the data belonged to one device.
- private final List<PlanNode> children = new ArrayList<>();
-
// Device column and measurement columns in result output
private final List<String> outputColumnNames;
@@ -217,4 +214,9 @@ public class DeviceViewNode extends MultiChildNode {
outputColumnNames,
deviceToMeasurementIndexesMap);
}
+
+ @Override
+ public String toString() {
+ return "DeviceView-" + this.getPlanNodeId();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index 7210bf449e..34c5bc7708 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -161,6 +161,15 @@ public class GroupByLevelNode extends MultiChildNode {
planNodeId, groupByLevelDescriptors, groupByTimeParameter, scanOrder);
}
+ @Nullable
+ public GroupByTimeParameter getGroupByTimeParameter() {
+ return groupByTimeParameter;
+ }
+
+ public OrderBy getScanOrder() {
+ return scanOrder;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;