You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/19 05:58:57 UTC

[iotdb] branch rel/1.0 updated: [To rel/1.0][IOTDB-5216] Fix order by timeseries doesn't take effect in aligned last query (#8501)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new 8bddc5aba4 [To rel/1.0][IOTDB-5216] Fix order by timeseries doesn't take effect in aligned last query (#8501)
8bddc5aba4 is described below

commit 8bddc5aba4a0fbbfb07b2f63c3d3bb252511018d
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Mon Dec 19 13:58:51 2022 +0800

    [To rel/1.0][IOTDB-5216] Fix order by timeseries doesn't take effect in aligned last query (#8501)
---
 .../apache/iotdb/db/metadata/utils/MetaUtils.java  | 23 ++++++++-
 .../process/last/LastQuerySortOperator.java        | 56 +++++++++++-----------
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  6 ++-
 3 files changed, 54 insertions(+), 31 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
index 765a953a53..24eda5eb1d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
@@ -27,17 +27,21 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.metadata.mnode.IMNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.TreeMap;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.LOSS;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.SDT_PARAMETERS;
@@ -97,8 +101,25 @@ public class MetaUtils {
   }
 
   public static List<PartialPath> groupAlignedSeries(List<PartialPath> fullPaths) {
+    return groupAlignedSeries(fullPaths, new HashMap<>());
+  }
+
+  public static List<PartialPath> groupAlignedSeriesWithOrder(
+      List<PartialPath> fullPaths, OrderByParameter orderByParameter) {
+    fullPaths.sort(
+        orderByParameter.getSortItemList().get(0).getOrdering() == Ordering.ASC
+            ? Comparator.naturalOrder()
+            : Comparator.reverseOrder());
+    Map<String, AlignedPath> deviceToAlignedPathMap =
+        orderByParameter.getSortItemList().get(0).getOrdering() == Ordering.ASC
+            ? new TreeMap<>()
+            : new TreeMap<>(Collections.reverseOrder());
+    return groupAlignedSeries(fullPaths, deviceToAlignedPathMap);
+  }
+
+  private static List<PartialPath> groupAlignedSeries(
+      List<PartialPath> fullPaths, Map<String, AlignedPath> deviceToAlignedPathMap) {
     List<PartialPath> result = new ArrayList<>();
-    Map<String, AlignedPath> deviceToAlignedPathMap = new HashMap<>();
     for (PartialPath path : fullPaths) {
       MeasurementPath measurementPath = (MeasurementPath) path;
       if (!measurementPath.isUnderAlignedEntity()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index aeb38b6b7d..522977ec2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -41,7 +41,6 @@ import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEF
 // collect all last query result in the same data region and sort them according to the
 // time-series's alphabetical order
 public class LastQuerySortOperator implements ProcessOperator {
-
   private static final int MAX_DETECT_COUNT =
       TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
 
@@ -70,6 +69,8 @@ public class LastQuerySortOperator implements ProcessOperator {
   // used to cache previous TsBlock get from children
   private TsBlock previousTsBlock;
 
+  private int previousTsBlockIndex = 0;
+
   public LastQuerySortOperator(
       OperatorContext operatorContext,
       TsBlock cachedTsBlock,
@@ -113,12 +114,13 @@ public class LastQuerySortOperator implements ProcessOperator {
     // we have consumed up data from children Operator, just return all remaining cached data in
     // cachedTsBlock, tsBlockBuilder and previousTsBlock
     if (currentIndex >= inputOperatorsCount) {
-      while (previousTsBlock != null) {
-        if (canUseDataFromCachedTsBlock(previousTsBlock)) {
-          LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
-        } else {
-          LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock);
-          previousTsBlock = null;
+      if (previousTsBlock != null) {
+        while (previousTsBlockIndex < previousTsBlock.getPositionCount()) {
+          if (canUseDataFromCachedTsBlock(previousTsBlock, previousTsBlockIndex)) {
+            LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
+          } else {
+            LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock, previousTsBlockIndex++);
+          }
         }
       }
       TsBlock res = cachedTsBlock.subTsBlock(cachedTsBlockRowIndex);
@@ -138,32 +140,27 @@ public class LastQuerySortOperator implements ProcessOperator {
     int endIndex = getEndIndex();
 
     while ((System.nanoTime() - start < maxRuntime)
-        && (currentIndex < endIndex || previousTsBlock != null)
+        && (currentIndex < endIndex
+            || (previousTsBlock != null
+                && previousTsBlockIndex < previousTsBlock.getPositionCount()))
         && !tsBlockBuilder.isFull()) {
-      if (previousTsBlock != null) {
-        if (canUseDataFromCachedTsBlock(previousTsBlock)) {
-          LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
-        } else {
-          LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock);
-          previousTsBlock = null;
-        }
-      } else {
+      if (previousTsBlock == null || previousTsBlock.getPositionCount() <= previousTsBlockIndex) {
         if (children.get(currentIndex).hasNext()) {
-          TsBlock tsBlock = children.get(currentIndex).next();
-          if (tsBlock == null) {
+          previousTsBlock = children.get(currentIndex).next();
+          previousTsBlockIndex = 0;
+          if (previousTsBlock == null) {
             return null;
-          } else if (!tsBlock.isEmpty()) {
-            if (canUseDataFromCachedTsBlock(tsBlock)) {
-              LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
-              previousTsBlock = tsBlock;
-            } else {
-              // it is safe to append the whole TsBlock
-              LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock);
-            }
           }
         }
         currentIndex++;
       }
+      if (previousTsBlockIndex < previousTsBlock.getPositionCount()) {
+        if (canUseDataFromCachedTsBlock(previousTsBlock, previousTsBlockIndex)) {
+          LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
+        } else {
+          LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock, previousTsBlockIndex++);
+        }
+      }
     }
 
     TsBlock res = tsBlockBuilder.build();
@@ -176,7 +173,7 @@ public class LastQuerySortOperator implements ProcessOperator {
     return currentIndex < inputOperatorsCount
         || cachedTsBlockRowIndex < cachedTsBlockSize
         || !tsBlockBuilder.isEmpty()
-        || previousTsBlock != null;
+        || (previousTsBlock != null && previousTsBlockIndex < previousTsBlock.getPositionCount());
   }
 
   @Override
@@ -222,9 +219,10 @@ public class LastQuerySortOperator implements ProcessOperator {
     return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
   }
 
-  private boolean canUseDataFromCachedTsBlock(TsBlock tsBlock) {
+  private boolean canUseDataFromCachedTsBlock(TsBlock tsBlock, int index) {
     return cachedTsBlockRowIndex < cachedTsBlockSize
-        && compareTimeSeries(cachedTsBlock, cachedTsBlockRowIndex, tsBlock, 0, timeSeriesComparator)
+        && compareTimeSeries(
+                cachedTsBlock, cachedTsBlockRowIndex, tsBlock, index, timeSeriesComparator)
             < 0;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index a04f877064..887456d267 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -196,7 +196,11 @@ public class LogicalPlanBuilder {
         sourceExpressions.stream()
             .map(expression -> ((TimeSeriesOperand) expression).getPath())
             .collect(Collectors.toList());
-    List<PartialPath> groupedPaths = MetaUtils.groupAlignedSeries(selectedPaths);
+
+    List<PartialPath> groupedPaths =
+        mergeOrderParameter.getSortItemList().isEmpty()
+            ? MetaUtils.groupAlignedSeries(selectedPaths)
+            : MetaUtils.groupAlignedSeriesWithOrder(selectedPaths, mergeOrderParameter);
     for (PartialPath path : groupedPaths) {
       if (path instanceof MeasurementPath) { // non-aligned series
         sourceNodeList.add(