You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/30 10:05:53 UTC

[iotdb] 01/01: Fix bug in last query

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LastQueryBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 646a605a705bd9c64a57254faf6742c1ed23b9bb
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Dec 30 18:05:37 2022 +0800

    Fix bug in last query
---
 .../apache/iotdb/db/metadata/utils/MetaUtils.java  | 10 ++-----
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  4 +--
 .../plan/planner/distribution/SourceRewriter.java  | 35 ++++++++++++++++++++++
 .../plan/node/process/last/LastQueryNode.java      |  6 +++-
 4 files changed, 45 insertions(+), 10 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
index efdef3226b..1c42e7b741 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
@@ -41,7 +41,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.TreeMap;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.LOSS;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.SDT_PARAMETERS;
@@ -118,15 +117,12 @@ public class MetaUtils {
 
   public static List<PartialPath> groupAlignedSeriesWithOrder(
       List<PartialPath> fullPaths, OrderByParameter orderByParameter) {
-    fullPaths.sort(
+    List<PartialPath> res = groupAlignedSeries(fullPaths, new HashMap<>());
+    res.sort(
         orderByParameter.getSortItemList().get(0).getOrdering() == Ordering.ASC
             ? Comparator.naturalOrder()
             : Comparator.reverseOrder());
-    Map<String, AlignedPath> deviceToAlignedPathMap =
-        orderByParameter.getSortItemList().get(0).getOrdering() == Ordering.ASC
-            ? new TreeMap<>()
-            : new TreeMap<>(Collections.reverseOrder());
-    return groupAlignedSeries(fullPaths, deviceToAlignedPathMap);
+    return res;
   }
 
   private static List<PartialPath> groupAlignedSeries(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 75f07c6fc2..6a8b5cd7d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2013,7 +2013,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
             .addOperatorContext(
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
-                TimeJoinOperator.class.getSimpleName());
+                LastQueryMergeOperator.class.getSimpleName());
 
     List<SortItem> items = node.getMergeOrderParameter().getSortItemList();
     Comparator<Binary> comparator =
@@ -2038,7 +2038,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
             .addOperatorContext(
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
-                TimeJoinOperator.class.getSimpleName());
+                LastQueryCollectOperator.class.getSimpleName());
 
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new LastQueryCollectOperator(operatorContext, children);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 20d703a945..9f4fcc27e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -61,6 +61,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 
 import java.util.ArrayList;
@@ -525,6 +528,13 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     PlanNode root = processRawMultiChildNode(node, context);
     if (context.queryMultiRegion) {
       PlanNode newRoot = genLastQueryRootNode(node, context);
+      // add sort op for each if we add LastQueryMergeNode as root
+      if (newRoot instanceof LastQueryMergeNode && node.getMergeOrderParameter().isEmpty()) {
+        OrderByParameter orderByParameter =
+            new OrderByParameter(
+                Collections.singletonList(new SortItem(SortKey.TIMESERIES, Ordering.ASC)));
+        addSortForEachLastQueryNode(root, orderByParameter);
+      }
       root.getChildren().forEach(newRoot::addChild);
       return Collections.singletonList(newRoot);
     } else {
@@ -532,6 +542,31 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     }
   }
 
+  private void addSortForEachLastQueryNode(PlanNode root, OrderByParameter orderByParameter) {
+    if (root instanceof LastQueryNode) {
+      LastQueryNode lastQueryNode = (LastQueryNode) root;
+      lastQueryNode.setMergeOrderParameter(orderByParameter);
+      lastQueryNode.setChildren(
+          lastQueryNode.getChildren().stream()
+              .sorted(
+                  Comparator.comparing(
+                      child -> {
+                        String device = "";
+                        if (child instanceof LastQueryScanNode) {
+                          device = ((LastQueryScanNode) child).getSeriesPath().getDevice();
+                        } else if (child instanceof AlignedLastQueryScanNode) {
+                          device = ((AlignedLastQueryScanNode) child).getSeriesPath().getDevice();
+                        }
+                        return device;
+                      }))
+              .collect(Collectors.toList()));
+    } else {
+      for (PlanNode child : root.getChildren()) {
+        addSortForEachLastQueryNode(child, orderByParameter);
+      }
+    }
+  }
+
   private PlanNode genLastQueryRootNode(LastQueryNode node, DistributionPlanContext context) {
     PlanNodeId id = context.queryContext.getQueryId().genPlanNodeId();
     if (context.oneSeriesInMultiRegion || !node.getMergeOrderParameter().isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
index b8e1cf422f..cdfde9c974 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
@@ -44,7 +44,7 @@ public class LastQueryNode extends MultiChildProcessNode {
 
   // The result output order, which could sort by sensor and time.
   // The size of this list is 2 and the first SortItem in this list has higher priority.
-  private final OrderByParameter mergeOrderParameter;
+  private OrderByParameter mergeOrderParameter;
 
   public LastQueryNode(PlanNodeId id, Filter timeFilter, OrderByParameter mergeOrderParameter) {
     super(id);
@@ -164,4 +164,8 @@ public class LastQueryNode extends MultiChildProcessNode {
   public OrderByParameter getMergeOrderParameter() {
     return mergeOrderParameter;
   }
+
+  public void setMergeOrderParameter(OrderByParameter mergeOrderParameter) {
+    this.mergeOrderParameter = mergeOrderParameter;
+  }
 }