You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/19 05:58:57 UTC
[iotdb] branch rel/1.0 updated: [To rel/1.0][IOTDB-5216] Fix order by timeseries doesn't take effect in aligned last query (#8501)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new 8bddc5aba4 [To rel/1.0][IOTDB-5216] Fix order by timeseries doesn't take effect in aligned last query (#8501)
8bddc5aba4 is described below
commit 8bddc5aba4a0fbbfb07b2f63c3d3bb252511018d
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Mon Dec 19 13:58:51 2022 +0800
[To rel/1.0][IOTDB-5216] Fix order by timeseries doesn't take effect in aligned last query (#8501)
---
.../apache/iotdb/db/metadata/utils/MetaUtils.java | 23 ++++++++-
.../process/last/LastQuerySortOperator.java | 56 +++++++++++-----------
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 6 ++-
3 files changed, 54 insertions(+), 31 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
index 765a953a53..24eda5eb1d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
@@ -27,17 +27,21 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Pair;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.TreeMap;
import static org.apache.iotdb.commons.conf.IoTDBConstant.LOSS;
import static org.apache.iotdb.commons.conf.IoTDBConstant.SDT_PARAMETERS;
@@ -97,8 +101,25 @@ public class MetaUtils {
}
public static List<PartialPath> groupAlignedSeries(List<PartialPath> fullPaths) {
+ return groupAlignedSeries(fullPaths, new HashMap<>());
+ }
+
+ public static List<PartialPath> groupAlignedSeriesWithOrder(
+ List<PartialPath> fullPaths, OrderByParameter orderByParameter) {
+ fullPaths.sort(
+ orderByParameter.getSortItemList().get(0).getOrdering() == Ordering.ASC
+ ? Comparator.naturalOrder()
+ : Comparator.reverseOrder());
+ Map<String, AlignedPath> deviceToAlignedPathMap =
+ orderByParameter.getSortItemList().get(0).getOrdering() == Ordering.ASC
+ ? new TreeMap<>()
+ : new TreeMap<>(Collections.reverseOrder());
+ return groupAlignedSeries(fullPaths, deviceToAlignedPathMap);
+ }
+
+ private static List<PartialPath> groupAlignedSeries(
+ List<PartialPath> fullPaths, Map<String, AlignedPath> deviceToAlignedPathMap) {
List<PartialPath> result = new ArrayList<>();
- Map<String, AlignedPath> deviceToAlignedPathMap = new HashMap<>();
for (PartialPath path : fullPaths) {
MeasurementPath measurementPath = (MeasurementPath) path;
if (!measurementPath.isUnderAlignedEntity()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index aeb38b6b7d..522977ec2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -41,7 +41,6 @@ import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEF
// collect all last query result in the same data region and sort them according to the
// time-series's alphabetical order
public class LastQuerySortOperator implements ProcessOperator {
-
private static final int MAX_DETECT_COUNT =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
@@ -70,6 +69,8 @@ public class LastQuerySortOperator implements ProcessOperator {
// used to cache previous TsBlock get from children
private TsBlock previousTsBlock;
+ private int previousTsBlockIndex = 0;
+
public LastQuerySortOperator(
OperatorContext operatorContext,
TsBlock cachedTsBlock,
@@ -113,12 +114,13 @@ public class LastQuerySortOperator implements ProcessOperator {
// we have consumed up data from children Operator, just return all remaining cached data in
// cachedTsBlock, tsBlockBuilder and previousTsBlock
if (currentIndex >= inputOperatorsCount) {
- while (previousTsBlock != null) {
- if (canUseDataFromCachedTsBlock(previousTsBlock)) {
- LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
- } else {
- LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock);
- previousTsBlock = null;
+ if (previousTsBlock != null) {
+ while (previousTsBlockIndex < previousTsBlock.getPositionCount()) {
+ if (canUseDataFromCachedTsBlock(previousTsBlock, previousTsBlockIndex)) {
+ LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
+ } else {
+ LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock, previousTsBlockIndex++);
+ }
}
}
TsBlock res = cachedTsBlock.subTsBlock(cachedTsBlockRowIndex);
@@ -138,32 +140,27 @@ public class LastQuerySortOperator implements ProcessOperator {
int endIndex = getEndIndex();
while ((System.nanoTime() - start < maxRuntime)
- && (currentIndex < endIndex || previousTsBlock != null)
+ && (currentIndex < endIndex
+ || (previousTsBlock != null
+ && previousTsBlockIndex < previousTsBlock.getPositionCount()))
&& !tsBlockBuilder.isFull()) {
- if (previousTsBlock != null) {
- if (canUseDataFromCachedTsBlock(previousTsBlock)) {
- LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
- } else {
- LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock);
- previousTsBlock = null;
- }
- } else {
+ if (previousTsBlock == null || previousTsBlock.getPositionCount() <= previousTsBlockIndex) {
if (children.get(currentIndex).hasNext()) {
- TsBlock tsBlock = children.get(currentIndex).next();
- if (tsBlock == null) {
+ previousTsBlock = children.get(currentIndex).next();
+ previousTsBlockIndex = 0;
+ if (previousTsBlock == null) {
return null;
- } else if (!tsBlock.isEmpty()) {
- if (canUseDataFromCachedTsBlock(tsBlock)) {
- LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
- previousTsBlock = tsBlock;
- } else {
- // it is safe to append the whole TsBlock
- LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock);
- }
}
}
currentIndex++;
}
+ if (previousTsBlockIndex < previousTsBlock.getPositionCount()) {
+ if (canUseDataFromCachedTsBlock(previousTsBlock, previousTsBlockIndex)) {
+ LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
+ } else {
+ LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock, previousTsBlockIndex++);
+ }
+ }
}
TsBlock res = tsBlockBuilder.build();
@@ -176,7 +173,7 @@ public class LastQuerySortOperator implements ProcessOperator {
return currentIndex < inputOperatorsCount
|| cachedTsBlockRowIndex < cachedTsBlockSize
|| !tsBlockBuilder.isEmpty()
- || previousTsBlock != null;
+ || (previousTsBlock != null && previousTsBlockIndex < previousTsBlock.getPositionCount());
}
@Override
@@ -222,9 +219,10 @@ public class LastQuerySortOperator implements ProcessOperator {
return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
}
- private boolean canUseDataFromCachedTsBlock(TsBlock tsBlock) {
+ private boolean canUseDataFromCachedTsBlock(TsBlock tsBlock, int index) {
return cachedTsBlockRowIndex < cachedTsBlockSize
- && compareTimeSeries(cachedTsBlock, cachedTsBlockRowIndex, tsBlock, 0, timeSeriesComparator)
+ && compareTimeSeries(
+ cachedTsBlock, cachedTsBlockRowIndex, tsBlock, index, timeSeriesComparator)
< 0;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index a04f877064..887456d267 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -196,7 +196,11 @@ public class LogicalPlanBuilder {
sourceExpressions.stream()
.map(expression -> ((TimeSeriesOperand) expression).getPath())
.collect(Collectors.toList());
- List<PartialPath> groupedPaths = MetaUtils.groupAlignedSeries(selectedPaths);
+
+ List<PartialPath> groupedPaths =
+ mergeOrderParameter.getSortItemList().isEmpty()
+ ? MetaUtils.groupAlignedSeries(selectedPaths)
+ : MetaUtils.groupAlignedSeriesWithOrder(selectedPaths, mergeOrderParameter);
for (PartialPath path : groupedPaths) {
if (path instanceof MeasurementPath) { // non-aligned series
sourceNodeList.add(