You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/01/03 07:53:29 UTC

[iotdb] branch rel/1.0 updated: [IOTDB-5245] Fix bug in last query (#8716)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new c859c6a4be [IOTDB-5245] Fix bug in last query (#8716)
c859c6a4be is described below

commit c859c6a4bed821d386d66ee26ae53cdbc3029176
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Tue Jan 3 15:53:24 2023 +0800

    [IOTDB-5245] Fix bug in last query (#8716)
    
    (cherry picked from commit bc887772cdf0a0fe4a091bab221b2ff36271d73b)
---
 .../apache/iotdb/db/metadata/utils/MetaUtils.java  | 10 ++----
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  4 +--
 .../plan/planner/distribution/SourceRewriter.java  | 38 ++++++++++++++++++++++
 .../plan/node/process/last/LastQueryNode.java      |  6 +++-
 4 files changed, 48 insertions(+), 10 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
index 24eda5eb1d..960c453b73 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
@@ -41,7 +41,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.TreeMap;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.LOSS;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.SDT_PARAMETERS;
@@ -106,15 +105,12 @@ public class MetaUtils {
 
   public static List<PartialPath> groupAlignedSeriesWithOrder(
       List<PartialPath> fullPaths, OrderByParameter orderByParameter) {
-    fullPaths.sort(
+    List<PartialPath> res = groupAlignedSeries(fullPaths, new HashMap<>());
+    res.sort(
         orderByParameter.getSortItemList().get(0).getOrdering() == Ordering.ASC
             ? Comparator.naturalOrder()
             : Comparator.reverseOrder());
-    Map<String, AlignedPath> deviceToAlignedPathMap =
-        orderByParameter.getSortItemList().get(0).getOrdering() == Ordering.ASC
-            ? new TreeMap<>()
-            : new TreeMap<>(Collections.reverseOrder());
-    return groupAlignedSeries(fullPaths, deviceToAlignedPathMap);
+    return res;
   }
 
   private static List<PartialPath> groupAlignedSeries(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 76dea3166a..ba3e661a23 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1962,7 +1962,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
             .addOperatorContext(
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
-                TimeJoinOperator.class.getSimpleName());
+                LastQueryMergeOperator.class.getSimpleName());
 
     List<SortItem> items = node.getMergeOrderParameter().getSortItemList();
     Comparator<Binary> comparator =
@@ -1987,7 +1987,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
             .addOperatorContext(
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
-                TimeJoinOperator.class.getSimpleName());
+                LastQueryCollectOperator.class.getSimpleName());
 
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new LastQueryCollectOperator(operatorContext, children);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 85e331ad22..9cb276384e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -60,6 +60,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 
 import java.util.ArrayList;
@@ -464,6 +467,13 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     PlanNode root = processRawMultiChildNode(node, context);
     if (context.queryMultiRegion) {
       PlanNode newRoot = genLastQueryRootNode(node, context);
+      // add sort op for each if we add LastQueryMergeNode as root
+      if (newRoot instanceof LastQueryMergeNode && node.getMergeOrderParameter().isEmpty()) {
+        OrderByParameter orderByParameter =
+            new OrderByParameter(
+                Collections.singletonList(new SortItem(SortKey.TIMESERIES, Ordering.ASC)));
+        addSortForEachLastQueryNode(root, orderByParameter);
+      }
       root.getChildren().forEach(newRoot::addChild);
       return newRoot;
     } else {
@@ -471,6 +481,34 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     }
   }
 
+  private void addSortForEachLastQueryNode(PlanNode root, OrderByParameter orderByParameter) {
+    if (root instanceof LastQueryNode
+        && (root.getChildren().get(0) instanceof LastQueryScanNode
+            || root.getChildren().get(0) instanceof AlignedLastQueryScanNode)) {
+      LastQueryNode lastQueryNode = (LastQueryNode) root;
+      lastQueryNode.setMergeOrderParameter(orderByParameter);
+      // sort children node
+      lastQueryNode.setChildren(
+          lastQueryNode.getChildren().stream()
+              .sorted(
+                  Comparator.comparing(
+                      child -> {
+                        String fullPath = "";
+                        if (child instanceof LastQueryScanNode) {
+                          fullPath = ((LastQueryScanNode) child).getSeriesPath().getFullPath();
+                        } else if (child instanceof AlignedLastQueryScanNode) {
+                          fullPath = ((AlignedLastQueryScanNode) child).getSeriesPath().getDevice();
+                        }
+                        return fullPath;
+                      }))
+              .collect(Collectors.toList()));
+    } else {
+      for (PlanNode child : root.getChildren()) {
+        addSortForEachLastQueryNode(child, orderByParameter);
+      }
+    }
+  }
+
   private PlanNode genLastQueryRootNode(LastQueryNode node, DistributionPlanContext context) {
     PlanNodeId id = context.queryContext.getQueryId().genPlanNodeId();
     if (context.oneSeriesInMultiRegion || !node.getMergeOrderParameter().isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
index b8e1cf422f..cdfde9c974 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
@@ -44,7 +44,7 @@ public class LastQueryNode extends MultiChildProcessNode {
 
   // The result output order, which could sort by sensor and time.
   // The size of this list is 2 and the first SortItem in this list has higher priority.
-  private final OrderByParameter mergeOrderParameter;
+  private OrderByParameter mergeOrderParameter;
 
   public LastQueryNode(PlanNodeId id, Filter timeFilter, OrderByParameter mergeOrderParameter) {
     super(id);
@@ -164,4 +164,8 @@ public class LastQueryNode extends MultiChildProcessNode {
   public OrderByParameter getMergeOrderParameter() {
     return mergeOrderParameter;
   }
+
+  public void setMergeOrderParameter(OrderByParameter mergeOrderParameter) {
+    this.mergeOrderParameter = mergeOrderParameter;
+  }
 }