You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2023/09/15 12:03:07 UTC
[iotdb] branch master updated: Enhance last query, support non single base series (#11120)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5f5a1d4fb7d Enhance last query, support non single base series (#11120)
5f5a1d4fb7d is described below
commit 5f5a1d4fb7daee377b7f3d68f72dc685a4f6f321
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Fri Sep 15 20:03:00 2023 +0800
Enhance last query, support non single base series (#11120)
---
.../operator/process/last/LastQueryOperator.java | 6 +-
.../process/last/LastQuerySortOperator.java | 4 +-
.../process/last/LastQueryTransformOperator.java | 120 ++++++++++++++++++
.../db/queryengine/plan/analyze/Analysis.java | 34 +++++-
.../queryengine/plan/analyze/AnalyzeVisitor.java | 33 +++--
.../schema/lastcache/ILastCacheContainer.java | 2 +-
.../plan/planner/LogicalPlanBuilder.java | 134 ++++++++++++++++-----
.../plan/planner/LogicalPlanVisitor.java | 11 +-
.../plan/planner/OperatorTreeGenerator.java | 26 +++-
.../plan/planner/SubPlanTypeExtractor.java | 15 +++
.../planner/distribution/DistributionPlanner.java | 4 +-
.../planner/distribution/ExchangeNodeAdder.java | 12 +-
.../SimpleFragmentParallelPlanner.java | 5 +-
.../plan/planner/distribution/SourceRewriter.java | 15 ++-
.../db/queryengine/plan/planner/plan/SubPlan.java | 5 +-
.../plan/planner/plan/node/PlanGraphPrinter.java | 9 ++
.../plan/planner/plan/node/PlanNodeType.java | 6 +-
.../plan/planner/plan/node/PlanVisitor.java | 5 +
.../node/process/last/LastQueryCollectNode.java | 17 ++-
.../plan/node/process/last/LastQueryMergeNode.java | 24 ++--
.../plan/node/process/last/LastQueryNode.java | 27 ++++-
...ollectNode.java => LastQueryTransformNode.java} | 89 ++++++++------
.../impl/mem/mnode/factory/MemMNodeFactory.java | 4 +-
.../impl/mem/mnode/impl/LogicalViewMNode.java | 64 ++++++++++
.../execution/operator/OperatorMemoryTest.java | 5 +-
.../plan/plan/QueryLogicalPlanUtil.java | 2 +-
.../plan/plan/distribution/LastQueryTest.java | 2 +-
27 files changed, 549 insertions(+), 131 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java
index 1408207c828..b1150507b55 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java
@@ -44,7 +44,7 @@ public class LastQueryOperator implements ProcessOperator {
private final OperatorContext operatorContext;
- private final List<AbstractUpdateLastCacheOperator> children;
+ private final List<Operator> children;
private final int inputOperatorsCount;
@@ -53,9 +53,7 @@ public class LastQueryOperator implements ProcessOperator {
private TsBlockBuilder tsBlockBuilder;
public LastQueryOperator(
- OperatorContext operatorContext,
- List<AbstractUpdateLastCacheOperator> children,
- TsBlockBuilder builder) {
+ OperatorContext operatorContext, List<Operator> children, TsBlockBuilder builder) {
this.operatorContext = operatorContext;
this.children = children;
this.inputOperatorsCount = children.size();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java
index f08aa156b21..aafdc71df9d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java
@@ -55,7 +55,7 @@ public class LastQuerySortOperator implements ProcessOperator {
private int cachedTsBlockRowIndex;
// we must make sure that Operator in children has already been sorted
- private final List<AbstractUpdateLastCacheOperator> children;
+ private final List<Operator> children;
private final OperatorContext operatorContext;
@@ -75,7 +75,7 @@ public class LastQuerySortOperator implements ProcessOperator {
public LastQuerySortOperator(
OperatorContext operatorContext,
TsBlock cachedTsBlock,
- List<AbstractUpdateLastCacheOperator> children,
+ List<Operator> children,
Comparator<Binary> timeSeriesComparator) {
this.cachedTsBlock = cachedTsBlock;
this.cachedTsBlockSize = cachedTsBlock.getPositionCount();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryTransformOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryTransformOperator.java
new file mode 100644
index 00000000000..0b87821bb1d
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryTransformOperator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.last;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class LastQueryTransformOperator implements ProcessOperator {
+
+ private String viewPath;
+
+ private String dataType;
+
+ private final OperatorContext operatorContext;
+
+ // the child of LastQueryTransformOperator will always be AggOperator
+ private final Operator child;
+
+ private TsBlockBuilder tsBlockBuilder;
+
+ public LastQueryTransformOperator(
+ String viewPath, String dataType, OperatorContext operatorContext, Operator child) {
+ this.viewPath = viewPath;
+ this.dataType = dataType;
+ this.operatorContext = operatorContext;
+ this.child = child;
+ this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(1);
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return this.operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ return child.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() throws Exception {
+ if (!tsBlockBuilder.isFull()) {
+ TsBlock tsBlock = child.nextWithTimer();
+ if (tsBlock == null) {
+ return null;
+ } else if (!tsBlock.isEmpty()) {
+ if (tsBlock.getColumn(1).isNull(0)) {
+ return null;
+ }
+ LastQueryUtil.appendLastValue(
+ tsBlockBuilder,
+ tsBlock.getColumn(0).getLong(0),
+ viewPath,
+ tsBlock.getColumn(1).getTsPrimitiveType(0).getStringValue(),
+ dataType);
+ }
+ } else {
+ child.close();
+ }
+
+ TsBlock res = tsBlockBuilder.build();
+ tsBlockBuilder.reset();
+ return res;
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return child.hasNext();
+ }
+
+ @Override
+ public boolean isFinished() throws Exception {
+ return !hasNextWithTimer();
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return Math.max(child.calculateMaxPeekMemory(), child.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return child.calculateMaxReturnSize();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return child.calculateRetainedSizeAfterCallingNext();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (child != null) {
+ child.close();
+ }
+ tsBlockBuilder = null;
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index 7aae5c96fac..8a0ac893d9a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -210,6 +210,12 @@ public class Analysis {
// timeseries, otherwise it will be null
private Ordering timeseriesOrderingForLastQuery = null;
+ // Used to store view expression in last query which is non-writable
+ private Set<Expression> lastQueryNonWritableViewExpressions;
+ private Map<Expression, List<Expression>> lastQueryNonWritableViewSourceExpressionMap;
+
+ private Set<Expression> lastQueryBaseExpressions;
+
// header of result dataset
private DatasetHeader respDatasetHeader;
@@ -349,7 +355,7 @@ public class Analysis {
return null;
}
TSDataType type = expressionTypes.get(NodeRef.of(expression));
- checkArgument(type != null, "Expression not analyzed: %s", expression);
+ checkArgument(type != null, "Expression is not analyzed: %s", expression);
return type;
}
@@ -731,6 +737,32 @@ public class Analysis {
this.timeseriesOrderingForLastQuery = timeseriesOrderingForLastQuery;
}
+ public Set<Expression> getLastQueryBaseExpressions() {
+ return this.lastQueryBaseExpressions;
+ }
+
+ public void setLastQueryBaseExpressions(Set<Expression> lastQueryBaseExpressions) {
+ this.lastQueryBaseExpressions = lastQueryBaseExpressions;
+ }
+
+ public Set<Expression> getLastQueryNonWritableViewExpressions() {
+ return this.lastQueryNonWritableViewExpressions;
+ }
+
+ public void setLastQueryNonWritableViewExpression(
+ Set<Expression> lastQueryNonWritableViewExpression) {
+ this.lastQueryNonWritableViewExpressions = lastQueryNonWritableViewExpression;
+ }
+
+ public Map<Expression, List<Expression>> getLastQueryNonWritableViewSourceExpressionMap() {
+ return this.lastQueryNonWritableViewSourceExpressionMap;
+ }
+
+ public void setLastQueryNonWritableViewSourceExpressionMap(
+ Map<Expression, List<Expression>> lastQueryNonWritableViewSourceExpressionMap) {
+ this.lastQueryNonWritableViewSourceExpressionMap = lastQueryNonWritableViewSourceExpressionMap;
+ }
+
public ModelInferenceDescriptor getModelInferenceDescriptor() {
return modelInferenceDescriptor;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index b3ca7de9f93..29208343af4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -495,20 +495,37 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
private void analyzeLastSource(
Analysis analysis, List<Expression> selectExpressions, ISchemaTree schemaTree) {
- Set<Expression> sourceExpressions;
-
- sourceExpressions = new LinkedHashSet<>();
+ Set<Expression> sourceExpressions = new LinkedHashSet<>();
+ Set<Expression> lastQueryBaseExpressions = new LinkedHashSet<>();
+ Set<Expression> lastQueryNonWritableViewExpressions = null;
+ Map<Expression, List<Expression>> lastQueryNonWritableViewSourceExpressionMap = null;
for (Expression selectExpression : selectExpressions) {
- for (Expression sourceExpression : bindSchemaForExpression(selectExpression, schemaTree)) {
- if (!(sourceExpression instanceof TimeSeriesOperand)) {
- throw new SemanticException(
- "Views with functions and expressions cannot be used in LAST query");
+ for (Expression lastQuerySourceExpression :
+ bindSchemaForExpression(selectExpression, schemaTree)) {
+ if (lastQuerySourceExpression instanceof TimeSeriesOperand) {
+ lastQueryBaseExpressions.add(lastQuerySourceExpression);
+ sourceExpressions.add(lastQuerySourceExpression);
+ } else {
+ if (lastQueryNonWritableViewExpressions == null) {
+ lastQueryNonWritableViewExpressions = new LinkedHashSet<>();
+ lastQueryNonWritableViewSourceExpressionMap = new HashMap<>();
+ }
+ List<Expression> sourceExpressionsOfNonWritableView =
+ searchSourceExpressions(lastQuerySourceExpression);
+ lastQueryNonWritableViewExpressions.add(lastQuerySourceExpression);
+ lastQueryNonWritableViewSourceExpressionMap.put(
+ lastQuerySourceExpression, sourceExpressionsOfNonWritableView);
+ sourceExpressions.addAll(sourceExpressionsOfNonWritableView);
}
- sourceExpressions.add(sourceExpression);
}
}
+
analysis.setSourceExpressions(sourceExpressions);
+ analysis.setLastQueryBaseExpressions(lastQueryBaseExpressions);
+ analysis.setLastQueryNonWritableViewExpression(lastQueryNonWritableViewExpressions);
+ analysis.setLastQueryNonWritableViewSourceExpressionMap(
+ lastQueryNonWritableViewSourceExpressionMap);
}
private void updateSchemaTreeByViews(Analysis analysis, ISchemaTree originSchemaTree) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/ILastCacheContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/ILastCacheContainer.java
index 20f5eb711db..3ada99d8ebf 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/ILastCacheContainer.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/ILastCacheContainer.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.tsfile.read.TimeValuePair;
/** this interface declares the operations of LastCache data */
public interface ILastCacheContainer {
- // get lastCache of monad timseries
+ // get lastCache of monad timeseries
TimeValuePair getCachedLast();
/**
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 8f4bfd9aeea..3468cf3b81e 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -71,6 +71,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ml.ForecastNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
@@ -108,11 +109,14 @@ import org.apache.commons.lang3.Validate;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -123,7 +127,10 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.DEVICE;
import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
+import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpression;
import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_VALUE;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.MAX_TIME;
public class LogicalPlanBuilder {
@@ -216,16 +223,13 @@ public class LogicalPlanBuilder {
}
public LogicalPlanBuilder planLast(
- Set<Expression> sourceExpressions, Filter globalTimeFilter, Ordering timeseriesOrdering) {
- List<PlanNode> sourceNodeList = new ArrayList<>();
+ Analysis analysis, Ordering timeseriesOrdering, Ordering resultTimeOrder, ZoneId zoneId) {
+ Set<String> deviceAlignedSet = new HashSet<>();
+ Set<String> deviceExistViewSet = new HashSet<>();
+ // <Device, <Measurement, Expression>>
+ Map<String, Map<String, Expression>> outputPathToSourceExpressionMap = new LinkedHashMap<>();
- Map<String, Boolean> deviceAlignedMap = new HashMap<>();
- Map<String, Boolean> deviceExistViewMap = new HashMap<>();
- Map<String, Map<String, Expression>> outputPathToSourceExpressionMap =
- timeseriesOrdering != null
- ? new TreeMap<>(timeseriesOrdering.getStringComparator())
- : new LinkedHashMap<>();
- for (Expression sourceExpression : sourceExpressions) {
+ for (Expression sourceExpression : analysis.getLastQueryBaseExpressions()) {
MeasurementPath outputPath =
(MeasurementPath)
(sourceExpression.isViewExpression()
@@ -240,46 +244,47 @@ public class LogicalPlanBuilder {
? new TreeMap<>(timeseriesOrdering.getStringComparator())
: new LinkedHashMap<>())
.put(outputPath.getMeasurement(), sourceExpression);
- if (!deviceAlignedMap.containsKey(outputDevice)) {
- deviceAlignedMap.put(outputDevice, outputPath.isUnderAlignedEntity());
+ if (outputPath.isUnderAlignedEntity()) {
+ deviceAlignedSet.add(outputDevice);
+ }
+ if (sourceExpression.isViewExpression()) {
+ deviceExistViewSet.add(outputDevice);
}
- deviceExistViewMap.put(
- outputDevice,
- deviceExistViewMap.getOrDefault(outputDevice, false)
- || sourceExpression.isViewExpression());
}
+ List<PlanNode> sourceNodeList = new ArrayList<>();
for (Map.Entry<String, Map<String, Expression>> deviceMeasurementExpressionEntry :
outputPathToSourceExpressionMap.entrySet()) {
String outputDevice = deviceMeasurementExpressionEntry.getKey();
- if (deviceExistViewMap.get(outputDevice)) {
+ Map<String, Expression> measurementToExpressionsOfDevice =
+ deviceMeasurementExpressionEntry.getValue();
+ if (deviceExistViewSet.contains(outputDevice)) {
// exist view
- for (Expression sourceExpression : deviceMeasurementExpressionEntry.getValue().values()) {
+ for (Expression sourceExpression : measurementToExpressionsOfDevice.values()) {
MeasurementPath selectedPath =
(MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath();
+ String outputViewPath =
+ sourceExpression.isViewExpression()
+ ? sourceExpression.getViewPath().getFullPath()
+ : null;
+
if (selectedPath.isUnderAlignedEntity()) { // aligned series
sourceNodeList.add(
new AlignedLastQueryScanNode(
context.getQueryId().genPlanNodeId(),
new AlignedPath(selectedPath),
- sourceExpression.isViewExpression()
- ? sourceExpression.getViewPath().getFullPath()
- : null));
+ outputViewPath));
} else { // non-aligned series
sourceNodeList.add(
new LastQueryScanNode(
- context.getQueryId().genPlanNodeId(),
- selectedPath,
- sourceExpression.isViewExpression()
- ? sourceExpression.getViewPath().getFullPath()
- : null));
+ context.getQueryId().genPlanNodeId(), selectedPath, outputViewPath));
}
}
} else {
- if (deviceAlignedMap.get(outputDevice)) {
+ if (deviceAlignedSet.contains(outputDevice)) {
// aligned series
List<MeasurementPath> measurementPaths =
- deviceMeasurementExpressionEntry.getValue().values().stream()
+ measurementToExpressionsOfDevice.values().stream()
.map(expression -> (MeasurementPath) ((TimeSeriesOperand) expression).getPath())
.collect(Collectors.toList());
AlignedPath alignedPath = new AlignedPath(measurementPaths.get(0).getDevicePath());
@@ -291,7 +296,7 @@ public class LogicalPlanBuilder {
context.getQueryId().genPlanNodeId(), alignedPath, null));
} else {
// non-aligned series
- for (Expression sourceExpression : deviceMeasurementExpressionEntry.getValue().values()) {
+ for (Expression sourceExpression : measurementToExpressionsOfDevice.values()) {
MeasurementPath selectedPath =
(MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath();
sourceNodeList.add(
@@ -301,12 +306,81 @@ public class LogicalPlanBuilder {
}
}
+ Set<Expression> lastQueryNonWriteViewExpressions =
+ analysis.getLastQueryNonWritableViewExpressions();
+ Map<Expression, List<Expression>> lastQueryNonWritableViewSourceExpressionMap =
+ analysis.getLastQueryNonWritableViewSourceExpressionMap();
+ if (lastQueryNonWriteViewExpressions != null) {
+ for (Expression expression : lastQueryNonWriteViewExpressions) {
+ Set<Expression> sourceTransformExpressions = Collections.singleton(expression);
+ FunctionExpression maxTimeAgg =
+ new FunctionExpression(
+ MAX_TIME, new LinkedHashMap<>(), Collections.singletonList(expression));
+ FunctionExpression lastValueAgg =
+ new FunctionExpression(
+ LAST_VALUE, new LinkedHashMap<>(), Collections.singletonList(expression));
+ analyzeExpression(analysis, expression);
+ analyzeExpression(analysis, maxTimeAgg);
+ analyzeExpression(analysis, lastValueAgg);
+
+ Set<Expression> sources =
+ new LinkedHashSet<>(lastQueryNonWritableViewSourceExpressionMap.get(expression));
+ LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
+ planBuilder =
+ planBuilder
+ .planRawDataSource(
+ sources,
+ resultTimeOrder,
+ analysis.getGlobalTimeFilter(),
+ analysis.isLastLevelUseWildcard())
+ .planWhereAndSourceTransform(
+ null, sourceTransformExpressions, false, zoneId, resultTimeOrder)
+ .planAggregation(
+ new LinkedHashSet<>(Arrays.asList(maxTimeAgg, lastValueAgg)),
+ null,
+ analysis.getGroupByTimeParameter(),
+ analysis.getGroupByParameter(),
+ false,
+ AggregationStep.SINGLE,
+ resultTimeOrder);
+
+ LastQueryTransformNode transformNode =
+ new LastQueryTransformNode(
+ context.getQueryId().genPlanNodeId(),
+ planBuilder.getRoot(),
+ expression.getViewPath().getFullPath(),
+ analysis.getType(expression).toString());
+ sourceNodeList.add(transformNode);
+ }
+ }
+
+ if (timeseriesOrdering != null) {
+ sourceNodeList.sort(
+ Comparator.comparing(
+ child -> {
+ String sortKey = "";
+ if (child instanceof LastQueryScanNode) {
+ sortKey = ((LastQueryScanNode) child).getOutputSymbolForSort();
+ } else if (child instanceof AlignedLastQueryScanNode) {
+ sortKey = ((AlignedLastQueryScanNode) child).getOutputSymbolForSort();
+ } else if (child instanceof LastQueryTransformNode) {
+ sortKey = ((LastQueryTransformNode) child).getOutputSymbolForSort();
+ }
+ return sortKey;
+ }));
+ if (timeseriesOrdering.equals(Ordering.DESC)) {
+ Collections.reverse(sourceNodeList);
+ }
+ }
+
this.root =
new LastQueryNode(
context.getQueryId().genPlanNodeId(),
sourceNodeList,
- globalTimeFilter,
- timeseriesOrdering);
+ analysis.getGlobalTimeFilter(),
+ timeseriesOrdering,
+ lastQueryNonWriteViewExpressions != null);
+
ColumnHeaderConstant.lastQueryColumnHeaders.forEach(
columnHeader ->
context
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index d1173f81865..7472147ec98 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -95,6 +95,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.commons.lang3.StringUtils;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -103,6 +104,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import static org.apache.iotdb.db.queryengine.plan.statement.component.Ordering.ASC;
import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
/**
@@ -131,9 +133,10 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
planBuilder =
planBuilder
.planLast(
- analysis.getSourceExpressions(),
- analysis.getGlobalTimeFilter(),
- analysis.getTimeseriesOrderingForLastQuery())
+ analysis,
+ analysis.getTimeseriesOrderingForLastQuery(),
+ queryStatement.getResultTimeOrder(),
+ queryStatement.getSelectComponent().getZoneId())
.planOffset(queryStatement.getRowOffset())
.planLimit(queryStatement.getRowLimit());
@@ -620,7 +623,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
&& 0 != analysis.getDataPartitionInfo().getDataPartitionMap().size()) {
PlanNode lastPlanNode =
new LogicalPlanBuilder(analysis, context)
- .planLast(analysis.getSourceExpressions(), analysis.getGlobalTimeFilter(), null)
+ .planLast(analysis, null, ASC, ZoneId.systemDefault())
.getRoot();
planBuilder = planBuilder.planSchemaQueryOrderByHeat(lastPlanNode);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index b12b65714f1..7ffd8d446ea 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -92,13 +92,13 @@ import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.Mul
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator;
-import org.apache.iotdb.db.queryengine.execution.operator.process.last.AbstractUpdateLastCacheOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.AlignedUpdateLastCacheOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.AlignedUpdateViewPathLastCacheOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryCollectOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryMergeOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQuerySortOperator;
+import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryTransformOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.UpdateLastCacheOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.UpdateViewPathLastCacheOperator;
@@ -180,6 +180,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ml.ForecastNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
@@ -2081,8 +2082,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanContext context) {
PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
TimeValuePair timeValuePair = null;
+ context.dataNodeQueryContext.lock();
try {
- context.dataNodeQueryContext.lock();
if (!context.dataNodeQueryContext.unCached(seriesPath)) {
timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
if (timeValuePair == null) {
@@ -2340,16 +2341,14 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
@Override
public Operator visitLastQuery(LastQueryNode node, LocalExecutionPlanContext context) {
-
context.setLastQueryTimeFilter(node.getTimeFilter());
context.setNeedUpdateLastCache(LastQueryUtil.needUpdateCache(node.getTimeFilter()));
context.setNeedUpdateNullEntry(LastQueryUtil.needUpdateNullEntry(node.getTimeFilter()));
- List<AbstractUpdateLastCacheOperator> operatorList =
+ List<Operator> operatorList =
node.getChildren().stream()
.map(child -> child.accept(this, context))
.filter(Objects::nonNull)
- .map(o -> (AbstractUpdateLastCacheOperator) o)
.collect(Collectors.toList());
List<Pair<TimeValuePair, Binary>> cachedLastValueAndPathList =
@@ -2451,6 +2450,23 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
return new LastQueryCollectOperator(operatorContext, children);
}
+ @Override
+ public Operator visitLastQueryTransform(
+ LastQueryTransformNode node, LocalExecutionPlanContext context) {
+ Operator operator = node.getChild().accept(this, context);
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ LastQueryCollectOperator.class.getSimpleName());
+
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+ return new LastQueryTransformOperator(
+ node.getViewPath(), node.getDataType(), operatorContext, operator);
+ }
+
private Map<String, List<InputLocation>> makeLayout(PlanNode node) {
Map<String, List<InputLocation>> outputMappings = new LinkedHashMap<>();
int tsBlockIndex = 0;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
index 12b083c9af9..b8920cffbc7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWin
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
@@ -128,19 +129,33 @@ public class SubPlanTypeExtractor {
@Override
public Void visitLastQuery(LastQueryNode node, Void context) {
+ if (node.isContainsLastTransformNode()) {
+ return visitPlan(node, context);
+ }
return null;
}
@Override
public Void visitLastQueryMerge(LastQueryMergeNode node, Void context) {
+ if (node.isContainsLastTransformNode()) {
+ return visitPlan(node, context);
+ }
return null;
}
@Override
public Void visitLastQueryCollect(LastQueryCollectNode node, Void context) {
+ if (node.isContainsLastTransformNode()) {
+ return visitPlan(node, context);
+ }
return null;
}
+ @Override
+ public Void visitLastQueryTransform(LastQueryTransformNode node, Void context) {
+ return visitPlan(node, context);
+ }
+
// end region PlanNode of last read
private void updateTypeProviderByAggregationDescriptor(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
index 2f14ffcd112..087ce5338eb 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
@@ -196,7 +196,7 @@ public class DistributionPlanner {
List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
// Only execute this step for READ operation
if (context.getQueryType() == QueryType.READ) {
- SetSinkForRootInstance(subPlan, fragmentInstances);
+ setSinkForRootInstance(subPlan, fragmentInstances);
}
return new DistributedQueryPlan(
logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances);
@@ -213,7 +213,7 @@ public class DistributionPlanner {
}
// TODO: (xingtanzjr) Maybe we should handle ResultNode in LogicalPlanner ?
- public void SetSinkForRootInstance(SubPlan subPlan, List<FragmentInstance> instances) {
+ public void setSinkForRootInstance(SubPlan subPlan, List<FragmentInstance> instances) {
FragmentInstance rootInstance = null;
for (FragmentInstance instance : instances) {
if (instance.getFragment().getId().equals(subPlan.getPlanFragment().getId())) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index b031ca9947e..6f2cce9a83f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -50,6 +50,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
@@ -229,6 +230,11 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
return processMultiChildNode(node, context);
}
+ @Override
+ public PlanNode visitLastQueryTransform(LastQueryTransformNode node, NodeGroupContext context) {
+ return processOneChildNode(node, context);
+ }
+
@Override
public PlanNode visitTimeJoin(TimeJoinNode node, NodeGroupContext context) {
return processMultiChildNode(node, context);
@@ -282,11 +288,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
List<PlanNode> visitedChildren = new ArrayList<>();
- node.getChildren()
- .forEach(
- child -> {
- visitedChildren.add(visit(child, context));
- });
+ node.getChildren().forEach(child -> visitedChildren.add(visit(child, context)));
TRegionReplicaSet dataRegion;
NodeDistributionType distributionType;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 2bf79e2a113..ff24932bca6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNo
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastSeriesSourceNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -183,7 +184,9 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
});
if (analysis.getStatement() instanceof QueryStatement
- || analysis.getStatement() instanceof ShowQueriesStatement) {
+ || analysis.getStatement() instanceof ShowQueriesStatement
+ || (analysis.getStatement() instanceof ShowTimeSeriesStatement
+ && ((ShowTimeSeriesStatement) analysis.getStatement()).isOrderByHeat())) {
fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider());
}
instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 4486be2b829..0253dcd9231 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -480,7 +480,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
LastQueryScanNode node, DistributionPlanContext context) {
LastQueryNode mergeNode =
new LastQueryNode(
- context.queryContext.getQueryId().genPlanNodeId(), node.getPartitionTimeFilter(), null);
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.getPartitionTimeFilter(),
+ null,
+ false);
return processRawSeriesScan(node, context, mergeNode);
}
@@ -489,7 +492,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
AlignedLastQueryScanNode node, DistributionPlanContext context) {
LastQueryNode mergeNode =
new LastQueryNode(
- context.queryContext.getQueryId().genPlanNodeId(), node.getPartitionTimeFilter(), null);
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.getPartitionTimeFilter(),
+ null,
+ false);
return processRawSeriesScan(node, context, mergeNode);
}
@@ -678,9 +684,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
// if the series is from multi regions or order by clause only refer to timeseries, use
// LastQueryMergeNode
if (context.oneSeriesInMultiRegion || node.needOrderByTimeseries()) {
- return new LastQueryMergeNode(id, node.getTimeseriesOrdering());
+ return new LastQueryMergeNode(
+ id, node.getTimeseriesOrdering(), node.isContainsLastTransformNode());
}
- return new LastQueryCollectNode(id);
+ return new LastQueryCollectNode(id, node.isContainsLastTransformNode());
}
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
index 38741d91e0e..0bceb53eb5c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
@@ -60,10 +60,7 @@ public class SubPlan {
public List<PlanFragment> getPlanFragmentList() {
List<PlanFragment> result = new ArrayList<>();
result.add(this.planFragment);
- this.children.forEach(
- child -> {
- result.addAll(child.getPlanFragmentList());
- });
+ this.children.forEach(child -> result.addAll(child.getPlanFragmentList()));
return result;
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index 77971989322..5e6f4089b1c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ml.ForecastNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
@@ -431,6 +432,14 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
return render(node, boxValue, context);
}
+ @Override
+ public List<String> visitLastQueryTransform(LastQueryTransformNode node, GraphContext context) {
+ List<String> boxValue = new ArrayList<>();
+ boxValue.add(String.format("LastQueryTransform-%s", node.getPlanNodeId().getId()));
+ boxValue.add(String.format("ViewPath: %s", node.getViewPath()));
+ return render(node, boxValue, context);
+ }
+
@Override
public List<String> visitHorizontallyConcat(HorizontallyConcatNode node, GraphContext context) {
List<String> boxValue = new ArrayList<>();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 180775622a8..d3ed441e563 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -78,6 +78,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ml.ForecastNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
@@ -181,7 +182,8 @@ public enum PlanNodeType {
LOGICAL_VIEW_SCHEMA_SCAN((short) 77),
ALTER_LOGICAL_VIEW((short) 78),
PIPE_ENRICHED_INSERT((short) 79),
- FORECAST((short) 80);
+ FORECAST((short) 80),
+ LAST_QUERY_TRANSFORM((short) 81);
public static final int BYTES = Short.BYTES;
@@ -388,6 +390,8 @@ public enum PlanNodeType {
return PipeEnrichedInsertNode.deserialize(buffer);
case 80:
return ForecastNode.deserialize(buffer);
+ case 81:
+ return LastQueryTransformNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index f1b9943936b..8859fbca46f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -78,6 +78,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ml.ForecastNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
@@ -239,6 +240,10 @@ public abstract class PlanVisitor<R, C> {
return visitMultiChildProcess(node, context);
}
+ public R visitLastQueryTransform(LastQueryTransformNode node, C context) {
+ return visitSingleChildProcess(node, context);
+ }
+
public R visitMergeSort(MergeSortNode node, C context) {
return visitMultiChildProcess(node, context);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
index e38b851e8dd..fbba5c6d336 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
@@ -34,8 +34,11 @@ import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.Last
public class LastQueryCollectNode extends MultiChildProcessNode {
- public LastQueryCollectNode(PlanNodeId id) {
+ private boolean containsLastTransformNode;
+
+ public LastQueryCollectNode(PlanNodeId id, boolean containsLastTransformNode) {
super(id);
+ this.containsLastTransformNode = containsLastTransformNode;
}
public LastQueryCollectNode(PlanNodeId id, List<PlanNode> children) {
@@ -54,7 +57,7 @@ public class LastQueryCollectNode extends MultiChildProcessNode {
@Override
public PlanNode clone() {
- return new LastQueryCollectNode(getPlanNodeId());
+ return new LastQueryCollectNode(getPlanNodeId(), containsLastTransformNode);
}
@Override
@@ -99,11 +102,19 @@ public class LastQueryCollectNode extends MultiChildProcessNode {
public static LastQueryCollectNode deserialize(ByteBuffer byteBuffer) {
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new LastQueryCollectNode(planNodeId);
+ return new LastQueryCollectNode(planNodeId, false);
}
@Override
public void setChildren(List<PlanNode> children) {
this.children = children;
}
+
+ public boolean isContainsLastTransformNode() {
+ return this.containsLastTransformNode;
+ }
+
+ public void setContainsLastQueryTransformNode() {
+ this.containsLastTransformNode = true;
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java
index 3c240f48903..7bdc4b245b0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryMergeNode.java
@@ -40,14 +40,14 @@ public class LastQueryMergeNode extends MultiChildProcessNode {
// The size of this list is 2 and the first SortItem in this list has higher priority.
private final Ordering timeseriesOrdering;
- public LastQueryMergeNode(PlanNodeId id, Ordering timeseriesOrdering) {
- super(id);
- this.timeseriesOrdering = timeseriesOrdering;
- }
+ // if children contains LastTransformNode
+ private boolean containsLastTransformNode;
- public LastQueryMergeNode(PlanNodeId id, List<PlanNode> children, Ordering timeseriesOrdering) {
- super(id, children);
+ public LastQueryMergeNode(
+ PlanNodeId id, Ordering timeseriesOrdering, boolean containsLastTransformNode) {
+ super(id);
this.timeseriesOrdering = timeseriesOrdering;
+ this.containsLastTransformNode = containsLastTransformNode;
}
@Override
@@ -62,7 +62,7 @@ public class LastQueryMergeNode extends MultiChildProcessNode {
@Override
public PlanNode clone() {
- return new LastQueryMergeNode(getPlanNodeId(), timeseriesOrdering);
+ return new LastQueryMergeNode(getPlanNodeId(), timeseriesOrdering, containsLastTransformNode);
}
@Override
@@ -135,7 +135,7 @@ public class LastQueryMergeNode extends MultiChildProcessNode {
timeseriesOrdering = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
}
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new LastQueryMergeNode(planNodeId, timeseriesOrdering);
+ return new LastQueryMergeNode(planNodeId, timeseriesOrdering, false);
}
@Override
@@ -146,4 +146,12 @@ public class LastQueryMergeNode extends MultiChildProcessNode {
public Ordering getTimeseriesOrdering() {
return timeseriesOrdering;
}
+
+ public boolean isContainsLastTransformNode() {
+ return this.containsLastTransformNode;
+ }
+
+ public void setContainsNonWritableView() {
+ this.containsLastTransformNode = true;
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java
index a47810a0076..0d6ade61aad 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java
@@ -46,20 +46,30 @@ public class LastQueryNode extends MultiChildProcessNode {
// which is set to null if there is no need to sort
private Ordering timeseriesOrdering;
- public LastQueryNode(PlanNodeId id, Filter timeFilter, @Nullable Ordering timeseriesOrdering) {
+ // if children contains LastTransformNode, this variable is only used in distribute plan
+ private boolean containsLastTransformNode;
+
+ public LastQueryNode(
+ PlanNodeId id,
+ Filter timeFilter,
+ @Nullable Ordering timeseriesOrdering,
+ boolean containsLastTransformNode) {
super(id);
this.timeFilter = timeFilter;
this.timeseriesOrdering = timeseriesOrdering;
+ this.containsLastTransformNode = containsLastTransformNode;
}
public LastQueryNode(
PlanNodeId id,
List<PlanNode> children,
Filter timeFilter,
- @Nullable Ordering timeseriesOrdering) {
+ @Nullable Ordering timeseriesOrdering,
+ boolean containsLastTransformNode) {
super(id, children);
this.timeFilter = timeFilter;
this.timeseriesOrdering = timeseriesOrdering;
+ this.containsLastTransformNode = containsLastTransformNode;
}
@Override
@@ -74,7 +84,8 @@ public class LastQueryNode extends MultiChildProcessNode {
@Override
public PlanNode clone() {
- return new LastQueryNode(getPlanNodeId(), timeFilter, timeseriesOrdering);
+ return new LastQueryNode(
+ getPlanNodeId(), timeFilter, timeseriesOrdering, containsLastTransformNode);
}
@Override
@@ -163,7 +174,7 @@ public class LastQueryNode extends MultiChildProcessNode {
timeseriesOrdering = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
}
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new LastQueryNode(planNodeId, timeFilter, timeseriesOrdering);
+ return new LastQueryNode(planNodeId, timeFilter, timeseriesOrdering, false);
}
@Override
@@ -184,6 +195,14 @@ public class LastQueryNode extends MultiChildProcessNode {
this.timeseriesOrdering = timeseriesOrdering;
}
+ public boolean isContainsLastTransformNode() {
+ return this.containsLastTransformNode;
+ }
+
+ public void setContainsLastTransformNode() {
+ this.containsLastTransformNode = true;
+ }
+
public boolean needOrderByTimeseries() {
return timeseriesOrdering != null;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
similarity index 53%
copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
index e38b851e8dd..fb2133f7dc2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryCollectNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryTransformNode.java
@@ -16,13 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -32,34 +34,27 @@ import java.util.Objects;
import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
-public class LastQueryCollectNode extends MultiChildProcessNode {
+public class LastQueryTransformNode extends SingleChildProcessNode {
- public LastQueryCollectNode(PlanNodeId id) {
- super(id);
- }
+ private final String viewPath;
- public LastQueryCollectNode(PlanNodeId id, List<PlanNode> children) {
- super(id, children);
- }
+ private final String dataType;
- @Override
- public List<PlanNode> getChildren() {
- return children;
+ public LastQueryTransformNode(PlanNodeId id, String viewPath, String dataType) {
+ super(id);
+ this.viewPath = viewPath;
+ this.dataType = dataType;
}
- @Override
- public void addChild(PlanNode child) {
- children.add(child);
+ public LastQueryTransformNode(PlanNodeId id, PlanNode aggNode, String viewPath, String dataType) {
+ super(id, aggNode);
+ this.viewPath = viewPath;
+ this.dataType = dataType;
}
@Override
public PlanNode clone() {
- return new LastQueryCollectNode(getPlanNodeId());
- }
-
- @Override
- public int allowedChildCount() {
- return CHILD_COUNT_NO_LIMIT;
+ return new LastQueryTransformNode(getPlanNodeId(), viewPath, dataType);
}
@Override
@@ -68,42 +63,60 @@ public class LastQueryCollectNode extends MultiChildProcessNode {
}
@Override
- public String toString() {
- return String.format("LastQueryCollectNode-%s", this.getPlanNodeId());
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.LAST_QUERY_TRANSFORM.serialize(byteBuffer);
+ ReadWriteIOUtils.write(viewPath, byteBuffer);
+ ReadWriteIOUtils.write(dataType, byteBuffer);
}
@Override
- public boolean equals(Object o) {
- return super.equals(o);
+ protected void serializeAttributes(DataOutputStream stream) throws IOException {
+ PlanNodeType.LAST_QUERY_TRANSFORM.serialize(stream);
+ ReadWriteIOUtils.write(viewPath, stream);
+ ReadWriteIOUtils.write(dataType, stream);
}
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode());
+ public static LastQueryTransformNode deserialize(ByteBuffer byteBuffer) {
+ String viewPath = ReadWriteIOUtils.readString(byteBuffer);
+ String dataType = ReadWriteIOUtils.readString(byteBuffer);
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new LastQueryTransformNode(planNodeId, viewPath, dataType);
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitLastQueryCollect(this, context);
+ return visitor.visitLastQueryTransform(this, context);
}
@Override
- protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.LAST_QUERY_COLLECT.serialize(byteBuffer);
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ LastQueryTransformNode that = (LastQueryTransformNode) o;
+ return viewPath.equals(that.viewPath) && dataType.equals(that.dataType);
}
@Override
- protected void serializeAttributes(DataOutputStream stream) throws IOException {
- PlanNodeType.LAST_QUERY_COLLECT.serialize(stream);
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), viewPath, dataType);
}
- public static LastQueryCollectNode deserialize(ByteBuffer byteBuffer) {
- PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new LastQueryCollectNode(planNodeId);
+ public String getViewPath() {
+ return this.viewPath;
}
- @Override
- public void setChildren(List<PlanNode> children) {
- this.children = children;
+ public String getDataType() {
+ return this.dataType;
+ }
+
+ public String getOutputSymbolForSort() {
+ return viewPath;
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/factory/MemMNodeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/factory/MemMNodeFactory.java
index 62f700c398c..a48d21d9230 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/factory/MemMNodeFactory.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/factory/MemMNodeFactory.java
@@ -30,7 +30,9 @@ import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.impl.B
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.impl.DatabaseDeviceMNode;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.impl.DatabaseMNode;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.impl.DeviceMNode;
+import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.impl.LogicalViewMNode;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.impl.MeasurementMNode;
+import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.info.LogicalViewInfo;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@MNodeFactory
@@ -76,6 +78,6 @@ public class MemMNodeFactory implements IMNodeFactory<IMemMNode> {
@Override
public IMeasurementMNode<IMemMNode> createLogicalViewMNode(
IDeviceMNode<IMemMNode> parent, String name, IMeasurementInfo measurementInfo) {
- throw new UnsupportedOperationException("View is not supported.");
+ return new LogicalViewMNode(parent, name, ((LogicalViewInfo) measurementInfo).getExpression());
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/impl/LogicalViewMNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/impl/LogicalViewMNode.java
new file mode 100644
index 00000000000..4b7382f300c
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/impl/LogicalViewMNode.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.impl;
+
+import org.apache.iotdb.commons.schema.node.common.AbstractMeasurementMNode;
+import org.apache.iotdb.commons.schema.node.info.IMeasurementInfo;
+import org.apache.iotdb.commons.schema.node.role.IDeviceMNode;
+import org.apache.iotdb.commons.schema.node.utils.IMNodeContainer;
+import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
+import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
+import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.IMemMNode;
+import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.basic.BasicMNode;
+import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.container.MemMNodeContainer;
+import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.info.LogicalViewInfo;
+
+public class LogicalViewMNode extends AbstractMeasurementMNode<IMemMNode, BasicMNode>
+ implements IMemMNode {
+
+ public LogicalViewMNode(
+ IDeviceMNode<IMemMNode> parent, String name, ViewExpression viewExpression) {
+ super(
+ new BasicMNode(parent == null ? null : parent.getAsMNode(), name),
+ new LogicalViewInfo(new LogicalViewSchema(name, viewExpression)));
+ }
+
+ @Override
+ public IMNodeContainer<IMemMNode> getChildren() {
+ return MemMNodeContainer.emptyMNodeContainer();
+ }
+
+ @Override
+ public IMemMNode getAsMNode() {
+ return this;
+ }
+
+ public void setExpression(ViewExpression expression) {
+ IMeasurementInfo measurementInfo = this.getMeasurementInfo();
+ if (measurementInfo instanceof LogicalViewInfo) {
+ ((LogicalViewInfo) measurementInfo).setExpression(expression);
+ }
+ }
+
+ @Override
+ public final boolean isLogicalView() {
+ return true;
+ }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
index 539d90bc84d..0231def964d 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
@@ -48,7 +48,6 @@ import org.apache.iotdb.db.queryengine.execution.operator.process.fill.IFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.fill.linear.LinearFill;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.HorizontallyConcatOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.RowBasedTimeJoinOperator;
-import org.apache.iotdb.db.queryengine.execution.operator.process.last.AbstractUpdateLastCacheOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryCollectOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryMergeOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryOperator;
@@ -346,7 +345,7 @@ public class OperatorMemoryTest {
public void lastQueryOperatorTest() {
TsBlockBuilder builder = Mockito.mock(TsBlockBuilder.class);
Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(1024L);
- List<AbstractUpdateLastCacheOperator> children = new ArrayList<>(4);
+ List<Operator> children = new ArrayList<>(4);
long expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
for (int i = 0; i < 4; i++) {
UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);
@@ -376,7 +375,7 @@ public class OperatorMemoryTest {
TsBlock tsBlock = Mockito.mock(TsBlock.class);
Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(16 * 1024L);
Mockito.when(tsBlock.getPositionCount()).thenReturn(16);
- List<AbstractUpdateLastCacheOperator> children = new ArrayList<>(4);
+ List<Operator> children = new ArrayList<>(4);
for (int i = 0; i < 4; i++) {
UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java
index b36feab3cff..87ffe9baca7 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java
@@ -143,7 +143,7 @@ public class QueryLogicalPlanUtil {
LastQueryNode lastQueryNode =
new LastQueryNode(
- queryId.genPlanNodeId(), sourceNodeList, TimeFilter.gt(100), Ordering.ASC);
+ queryId.genPlanNodeId(), sourceNodeList, TimeFilter.gt(100), Ordering.ASC, false);
querySQLs.add(sql);
sqlToPlanMap.put(sql, lastQueryNode);
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/LastQueryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/LastQueryTest.java
index db875ef9b38..13d1586d475 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/LastQueryTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/LastQueryTest.java
@@ -209,7 +209,7 @@ public class LastQueryTest {
}
PlanNode root =
- new LastQueryNode(context.getQueryId().genPlanNodeId(), sourceNodeList, null, null);
+ new LastQueryNode(context.getQueryId().genPlanNodeId(), sourceNodeList, null, null, false);
return new LogicalQueryPlan(context, root);
}
}