You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/27 13:32:39 UTC

[iotdb] 06/07: fix the bug in Align by device

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/align_by_device_distribution
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 992fa36d7faac7794f8a7f0929c5f1bf0be0648b
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri May 27 21:12:28 2022 +0800

    fix the bug in Align by device
---
 .../db/mpp/plan/planner/LocalExecutionPlanner.java | 49 +++++++++++++++++++++-
 .../plan/planner/distribution/SourceRewriter.java  |  4 +-
 .../planner/plan/node/process/DeviceMergeNode.java |  7 +++-
 .../planner/plan/node/process/DeviceViewNode.java  |  8 ++--
 .../plan/node/process/GroupByLevelNode.java        |  9 ++++
 5 files changed, 69 insertions(+), 8 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 6587e05306..08040f0d06 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -143,6 +143,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSc
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn;
 import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
@@ -785,7 +786,53 @@ public class LocalExecutionPlanner {
 
     @Override
     public Operator visitGroupByLevel(GroupByLevelNode node, LocalExecutionPlanContext context) {
-      return super.visitGroupByLevel(node, context);
+      checkArgument(
+          node.getGroupByLevelDescriptors().size() >= 1,
+          "GroupByLevel descriptorList cannot be empty");
+      List<Operator> children =
+          node.getChildren().stream()
+              .map(child -> child.accept(this, context))
+              .collect(Collectors.toList());
+      boolean ascending = node.getScanOrder() == OrderBy.TIMESTAMP_ASC;
+      List<Aggregator> aggregators = new ArrayList<>();
+      Map<String, List<InputLocation>> layout = makeLayout(node);
+      for (GroupByLevelDescriptor descriptor : node.getGroupByLevelDescriptors()) {
+        List<String> inputColumnNames = descriptor.getInputColumnNames();
+        // it may include double parts
+        List<List<InputLocation>> inputLocationParts = new ArrayList<>(inputColumnNames.size());
+        inputColumnNames.forEach(o -> inputLocationParts.add(layout.get(o)));
+
+        List<InputLocation[]> inputLocationList = new ArrayList<>();
+        for (int i = 0; i < inputLocationParts.get(0).size(); i++) {
+          if (inputColumnNames.size() == 1) {
+            inputLocationList.add(new InputLocation[] {inputLocationParts.get(0).get(i)});
+          } else {
+            inputLocationList.add(
+                new InputLocation[] {
+                    inputLocationParts.get(0).get(i), inputLocationParts.get(1).get(i)
+                });
+          }
+        }
+
+        aggregators.add(
+            new Aggregator(
+                AccumulatorFactory.createAccumulator(
+                    descriptor.getAggregationType(),
+                    context
+                        .getTypeProvider()
+                        // get the type of first inputExpression
+                        .getType(descriptor.getInputExpressions().get(0).toString()),
+                    ascending),
+                descriptor.getStep(),
+                inputLocationList));
+      }
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              AggregationOperator.class.getSimpleName());
+      return new AggregationOperator(
+          operatorContext, aggregators, children, ascending, node.getGroupByTimeParameter());
     }
 
     @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 014aec6443..64a348ad76 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -153,11 +153,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     private PlanNode buildPlanNodeInRegion(
         PlanNode root, TRegionReplicaSet regionReplicaSet, MPPQueryContext context) {
       List<PlanNode> children =
-          root.getChildren().stream().map(PlanNodeUtil::deepCopy).collect(Collectors.toList());
+          root.getChildren().stream().map(child -> buildPlanNodeInRegion(child, regionReplicaSet, context)).collect(Collectors.toList());
       PlanNode newRoot = root.cloneWithChildren(children);
       newRoot.setPlanNodeId(context.getQueryId().genPlanNodeId());
       if (newRoot instanceof SourceNode) {
-        ((SourceNode) root).setRegionReplicaSet(regionReplicaSet);
+        ((SourceNode) newRoot).setRegionReplicaSet(regionReplicaSet);
       }
       return newRoot;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
index d8456f4968..f10b69a474 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
@@ -41,8 +41,6 @@ public class DeviceMergeNode extends MultiChildNode {
   // the list of selected devices
   private final List<String> devices;
 
-  private final List<PlanNode> children;
-
   public DeviceMergeNode(
       PlanNodeId id, List<PlanNode> children, List<OrderBy> mergeOrders, List<String> devices) {
     super(id);
@@ -146,4 +144,9 @@ public class DeviceMergeNode extends MultiChildNode {
   public int hashCode() {
     return Objects.hash(super.hashCode(), mergeOrders, devices, children);
   }
+
+  @Override
+  public String toString() {
+    return "DeviceMerge-" + this.getPlanNodeId();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
index 3d6dfcb8f1..a70252c55d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
@@ -49,9 +49,6 @@ public class DeviceViewNode extends MultiChildNode {
   // The size devices and children should be the same.
   private final List<String> devices = new ArrayList<>();
 
-  // each child node whose output TsBlock contains the data belonged to one device.
-  private final List<PlanNode> children = new ArrayList<>();
-
   // Device column and measurement columns in result output
   private final List<String> outputColumnNames;
 
@@ -217,4 +214,9 @@ public class DeviceViewNode extends MultiChildNode {
         outputColumnNames,
         deviceToMeasurementIndexesMap);
   }
+
+  @Override
+  public String toString() {
+    return "DeviceView-" + this.getPlanNodeId();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index 7210bf449e..34c5bc7708 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -161,6 +161,15 @@ public class GroupByLevelNode extends MultiChildNode {
         planNodeId, groupByLevelDescriptors, groupByTimeParameter, scanOrder);
   }
 
+  @Nullable
+  public GroupByTimeParameter getGroupByTimeParameter() {
+    return groupByTimeParameter;
+  }
+
+  public OrderBy getScanOrder() {
+    return scanOrder;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;