You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/30 10:05:53 UTC
[iotdb] 01/01: Fix bug in last query
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch LastQueryBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 646a605a705bd9c64a57254faf6742c1ed23b9bb
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Dec 30 18:05:37 2022 +0800
Fix bug in last query
---
.../apache/iotdb/db/metadata/utils/MetaUtils.java | 10 ++-----
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 4 +--
.../plan/planner/distribution/SourceRewriter.java | 35 ++++++++++++++++++++++
.../plan/node/process/last/LastQueryNode.java | 6 +++-
4 files changed, 45 insertions(+), 10 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
index efdef3226b..1c42e7b741 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
@@ -41,7 +41,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.TreeMap;
import static org.apache.iotdb.commons.conf.IoTDBConstant.LOSS;
import static org.apache.iotdb.commons.conf.IoTDBConstant.SDT_PARAMETERS;
@@ -118,15 +117,12 @@ public class MetaUtils {
public static List<PartialPath> groupAlignedSeriesWithOrder(
List<PartialPath> fullPaths, OrderByParameter orderByParameter) {
- fullPaths.sort(
+ List<PartialPath> res = groupAlignedSeries(fullPaths, new HashMap<>());
+ res.sort(
orderByParameter.getSortItemList().get(0).getOrdering() == Ordering.ASC
? Comparator.naturalOrder()
: Comparator.reverseOrder());
- Map<String, AlignedPath> deviceToAlignedPathMap =
- orderByParameter.getSortItemList().get(0).getOrdering() == Ordering.ASC
- ? new TreeMap<>()
- : new TreeMap<>(Collections.reverseOrder());
- return groupAlignedSeries(fullPaths, deviceToAlignedPathMap);
+ return res;
}
private static List<PartialPath> groupAlignedSeries(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 75f07c6fc2..6a8b5cd7d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2013,7 +2013,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
- TimeJoinOperator.class.getSimpleName());
+ LastQueryMergeOperator.class.getSimpleName());
List<SortItem> items = node.getMergeOrderParameter().getSortItemList();
Comparator<Binary> comparator =
@@ -2038,7 +2038,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
- TimeJoinOperator.class.getSimpleName());
+ LastQueryCollectOperator.class.getSimpleName());
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new LastQueryCollectOperator(operatorContext, children);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 20d703a945..9f4fcc27e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -61,6 +61,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import java.util.ArrayList;
@@ -525,6 +528,13 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
PlanNode root = processRawMultiChildNode(node, context);
if (context.queryMultiRegion) {
PlanNode newRoot = genLastQueryRootNode(node, context);
+ // add sort op for each if we add LastQueryMergeNode as root
+ if (newRoot instanceof LastQueryMergeNode && node.getMergeOrderParameter().isEmpty()) {
+ OrderByParameter orderByParameter =
+ new OrderByParameter(
+ Collections.singletonList(new SortItem(SortKey.TIMESERIES, Ordering.ASC)));
+ addSortForEachLastQueryNode(root, orderByParameter);
+ }
root.getChildren().forEach(newRoot::addChild);
return Collections.singletonList(newRoot);
} else {
@@ -532,6 +542,31 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
}
}
+ private void addSortForEachLastQueryNode(PlanNode root, OrderByParameter orderByParameter) {
+ if (root instanceof LastQueryNode) {
+ LastQueryNode lastQueryNode = (LastQueryNode) root;
+ lastQueryNode.setMergeOrderParameter(orderByParameter);
+ lastQueryNode.setChildren(
+ lastQueryNode.getChildren().stream()
+ .sorted(
+ Comparator.comparing(
+ child -> {
+ String device = "";
+ if (child instanceof LastQueryScanNode) {
+ device = ((LastQueryScanNode) child).getSeriesPath().getDevice();
+ } else if (child instanceof AlignedLastQueryScanNode) {
+ device = ((AlignedLastQueryScanNode) child).getSeriesPath().getDevice();
+ }
+ return device;
+ }))
+ .collect(Collectors.toList()));
+ } else {
+ for (PlanNode child : root.getChildren()) {
+ addSortForEachLastQueryNode(child, orderByParameter);
+ }
+ }
+ }
+
private PlanNode genLastQueryRootNode(LastQueryNode node, DistributionPlanContext context) {
PlanNodeId id = context.queryContext.getQueryId().genPlanNodeId();
if (context.oneSeriesInMultiRegion || !node.getMergeOrderParameter().isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
index b8e1cf422f..cdfde9c974 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
@@ -44,7 +44,7 @@ public class LastQueryNode extends MultiChildProcessNode {
// The result output order, which could sort by sensor and time.
// The size of this list is 2 and the first SortItem in this list has higher priority.
- private final OrderByParameter mergeOrderParameter;
+ private OrderByParameter mergeOrderParameter;
public LastQueryNode(PlanNodeId id, Filter timeFilter, OrderByParameter mergeOrderParameter) {
super(id);
@@ -164,4 +164,8 @@ public class LastQueryNode extends MultiChildProcessNode {
public OrderByParameter getMergeOrderParameter() {
return mergeOrderParameter;
}
+
+ public void setMergeOrderParameter(OrderByParameter mergeOrderParameter) {
+ this.mergeOrderParameter = mergeOrderParameter;
+ }
}