You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/04 10:16:16 UTC
[iotdb] branch rel/1.0 updated: [To rel/1.0] [IOTDB-5026] Improve last query on aligned timeseries
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new 96343c9c9e [To rel/1.0] [IOTDB-5026] Improve last query on aligned timeseries
96343c9c9e is described below
commit 96343c9c9ead2c6cf48ad0c3d05d90d189aef9c9
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Sun Dec 4 18:16:10 2022 +0800
[To rel/1.0] [IOTDB-5026] Improve last query on aligned timeseries
---
.../org/apache/iotdb/commons/path/AlignedPath.java | 18 +++++
...r.java => AbstractUpdateLastCacheOperator.java} | 72 +++--------------
.../last/AlignedUpdateLastCacheOperator.java | 89 ++++++++++++++++++++++
.../operator/process/last/LastQueryOperator.java | 6 +-
.../process/last/LastQuerySortOperator.java | 11 +--
.../operator/process/last/LastQueryUtil.java | 18 +++++
.../process/last/UpdateLastCacheOperator.java | 84 +-------------------
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 1 -
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 20 +++--
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 89 +++++++++++++---------
.../db/mpp/plan/planner/SubPlanTypeExtractor.java | 29 +++++++
.../mpp/execution/operator/OperatorMemoryTest.java | 5 +-
12 files changed, 244 insertions(+), 198 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java b/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
index de2a16c4ad..affcf4b70c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
@@ -92,6 +92,12 @@ public class AlignedPath extends PartialPath {
measurementList.add(subSensor);
}
+ public AlignedPath(PartialPath vectorPath) {
+ super(vectorPath.getNodes());
+ measurementList = new ArrayList<>();
+ schemaList = new ArrayList<>();
+ }
+
public AlignedPath(MeasurementPath path) {
super(path.getDevicePath().getNodes());
measurementList = new ArrayList<>();
@@ -157,6 +163,18 @@ public class AlignedPath extends PartialPath {
schemaList.add(measurementPath.getMeasurementSchema());
}
+ public void addMeasurement(String measurement, IMeasurementSchema measurementSchema) {
+ if (measurementList == null) {
+ measurementList = new ArrayList<>();
+ }
+ measurementList.add(measurement);
+
+ if (schemaList == null) {
+ schemaList = new ArrayList<>();
+ }
+ schemaList.add(measurementSchema);
+ }
+
/**
* merge another aligned path's sub sensors into this one
*
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
index 8afd9b99de..0b4fad3bda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
@@ -16,62 +16,45 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process.last;
-import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
-import static com.google.common.base.Preconditions.checkArgument;
-
-public class UpdateLastCacheOperator implements ProcessOperator {
-
- private static final TsBlock LAST_QUERY_EMPTY_TSBLOCK =
+public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator {
+ protected static final TsBlock LAST_QUERY_EMPTY_TSBLOCK =
new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT))
.build();
- private final OperatorContext operatorContext;
-
- private final Operator child;
-
- // fullPath for queried time series
- // It should be exact PartialPath, neither MeasurementPath nor AlignedPath, because lastCache only
- // accept PartialPath
- private final MeasurementPath fullPath;
+ protected OperatorContext operatorContext;
- // dataType for queried time series;
- private final String dataType;
+ protected Operator child;
- private final DataNodeSchemaCache lastCache;
+ protected DataNodeSchemaCache lastCache;
- private final boolean needUpdateCache;
+ protected boolean needUpdateCache;
- private final TsBlockBuilder tsBlockBuilder;
+ protected TsBlockBuilder tsBlockBuilder;
- private String databaseName;
+ protected String databaseName;
- public UpdateLastCacheOperator(
+ public AbstractUpdateLastCacheOperator(
OperatorContext operatorContext,
Operator child,
- MeasurementPath fullPath,
- TSDataType dataType,
DataNodeSchemaCache dataNodeSchemaCache,
boolean needUpdateCache) {
this.operatorContext = operatorContext;
this.child = child;
- this.fullPath = fullPath;
- this.dataType = dataType.name();
this.lastCache = dataNodeSchemaCache;
this.needUpdateCache = needUpdateCache;
this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(1);
@@ -87,40 +70,7 @@ public class UpdateLastCacheOperator implements ProcessOperator {
return child.isBlocked();
}
- @Override
- public TsBlock next() {
- TsBlock res = child.next();
- if (res == null) {
- return null;
- }
- if (res.isEmpty()) {
- return LAST_QUERY_EMPTY_TSBLOCK;
- }
-
- checkArgument(res.getPositionCount() == 1, "last query result should only have one record");
-
- // last value is null
- if (res.getColumn(0).isNull(0)) {
- return LAST_QUERY_EMPTY_TSBLOCK;
- }
-
- long lastTime = res.getColumn(0).getLong(0);
- TsPrimitiveType lastValue = res.getColumn(1).getTsPrimitiveType(0);
-
- if (needUpdateCache) {
- TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue);
- lastCache.updateLastCache(getDatabaseName(), fullPath, timeValuePair, false, Long.MIN_VALUE);
- }
-
- tsBlockBuilder.reset();
-
- LastQueryUtil.appendLastValue(
- tsBlockBuilder, lastTime, fullPath.getFullPath(), lastValue.getStringValue(), dataType);
-
- return tsBlockBuilder.build();
- }
-
- private String getDatabaseName() {
+ protected String getDatabaseName() {
if (databaseName == null) {
databaseName =
((DataDriverContext) operatorContext.getInstanceContext().getDriverContext())
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
new file mode 100644
index 0000000000..14354cbc3b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.last;
+
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import static org.weakref.jmx.internal.guava.base.Preconditions.checkArgument;
+
+/** update last cache for aligned series */
+public class AlignedUpdateLastCacheOperator extends AbstractUpdateLastCacheOperator {
+
+ private final AlignedPath seriesPath;
+
+ private final PartialPath devicePath;
+
+ public AlignedUpdateLastCacheOperator(
+ OperatorContext operatorContext,
+ Operator child,
+ AlignedPath seriesPath,
+ DataNodeSchemaCache dataNodeSchemaCache,
+ boolean needUpdateCache) {
+ super(operatorContext, child, dataNodeSchemaCache, needUpdateCache);
+ this.seriesPath = seriesPath;
+ this.devicePath = seriesPath.getDevicePath();
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlock res = child.next();
+ if (res == null) {
+ return null;
+ }
+ if (res.isEmpty()) {
+ return LAST_QUERY_EMPTY_TSBLOCK;
+ }
+
+ checkArgument(res.getPositionCount() == 1, "last query result should only have one record");
+
+ tsBlockBuilder.reset();
+ for (int i = 0; i + 1 < res.getValueColumnCount(); i += 2) {
+ if (!res.getColumn(i).isNull(0)) {
+ long lastTime = res.getColumn(i).getLong(0);
+ TsPrimitiveType lastValue = res.getColumn(i + 1).getTsPrimitiveType(0);
+ MeasurementPath measurementPath =
+ new MeasurementPath(
+ devicePath.concatNode(seriesPath.getMeasurementList().get(i / 2)),
+ seriesPath.getSchemaList().get(i / 2),
+ true);
+ if (needUpdateCache) {
+ TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue);
+ lastCache.updateLastCache(
+ getDatabaseName(), measurementPath, timeValuePair, false, Long.MIN_VALUE);
+ }
+ LastQueryUtil.appendLastValue(
+ tsBlockBuilder,
+ lastTime,
+ measurementPath.getFullPath(),
+ lastValue.getStringValue(),
+ seriesPath.getSchemaList().get(i / 2).getType().name());
+ }
+ }
+ return !tsBlockBuilder.isEmpty() ? tsBlockBuilder.build() : LAST_QUERY_EMPTY_TSBLOCK;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index 27ae0becda..7a689d4aa0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -43,7 +43,7 @@ public class LastQueryOperator implements ProcessOperator {
private final OperatorContext operatorContext;
- private final List<UpdateLastCacheOperator> children;
+ private final List<AbstractUpdateLastCacheOperator> children;
private final int inputOperatorsCount;
@@ -53,7 +53,7 @@ public class LastQueryOperator implements ProcessOperator {
public LastQueryOperator(
OperatorContext operatorContext,
- List<UpdateLastCacheOperator> children,
+ List<AbstractUpdateLastCacheOperator> children,
TsBlockBuilder builder) {
this.operatorContext = operatorContext;
this.children = children;
@@ -109,7 +109,7 @@ public class LastQueryOperator implements ProcessOperator {
if (tsBlock == null) {
return null;
} else if (!tsBlock.isEmpty()) {
- LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock, 0);
+ LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock);
}
}
currentIndex++;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index 082173c29f..aeb38b6b7d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -55,7 +55,7 @@ public class LastQuerySortOperator implements ProcessOperator {
private int cachedTsBlockRowIndex;
// we must make sure that Operator in children has already been sorted
- private final List<UpdateLastCacheOperator> children;
+ private final List<AbstractUpdateLastCacheOperator> children;
private final OperatorContext operatorContext;
@@ -73,7 +73,7 @@ public class LastQuerySortOperator implements ProcessOperator {
public LastQuerySortOperator(
OperatorContext operatorContext,
TsBlock cachedTsBlock,
- List<UpdateLastCacheOperator> children,
+ List<AbstractUpdateLastCacheOperator> children,
Comparator<Binary> timeSeriesComparator) {
this.cachedTsBlock = cachedTsBlock;
this.cachedTsBlockSize = cachedTsBlock.getPositionCount();
@@ -117,7 +117,7 @@ public class LastQuerySortOperator implements ProcessOperator {
if (canUseDataFromCachedTsBlock(previousTsBlock)) {
LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
} else {
- LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock, 0);
+ LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock);
previousTsBlock = null;
}
}
@@ -144,7 +144,7 @@ public class LastQuerySortOperator implements ProcessOperator {
if (canUseDataFromCachedTsBlock(previousTsBlock)) {
LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
} else {
- LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock, 0);
+ LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock);
previousTsBlock = null;
}
} else {
@@ -157,7 +157,8 @@ public class LastQuerySortOperator implements ProcessOperator {
LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
previousTsBlock = tsBlock;
} else {
- LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock, 0);
+ // it is safe to append the whole TsBlock
+ LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryUtil.java
index 3ec62e8877..f8bb75f088 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryUtil.java
@@ -140,6 +140,24 @@ public class LastQueryUtil {
return aggregators;
}
+ public static List<Aggregator> createAggregators(TSDataType dataType, int valueColumnIndex) {
+ // max_time, last_value
+ List<Aggregator> aggregators = new ArrayList<>(2);
+ aggregators.add(
+ new Aggregator(
+ new MaxTimeDescAccumulator(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(
+ new InputLocation[] {new InputLocation(0, valueColumnIndex)})));
+ aggregators.add(
+ new Aggregator(
+ new LastValueDescAccumulator(dataType),
+ AggregationStep.SINGLE,
+ Collections.singletonList(
+ new InputLocation[] {new InputLocation(0, valueColumnIndex)})));
+ return aggregators;
+ }
+
public static boolean needUpdateCache(Filter timeFilter) {
// Update the cache only when, the filter is gt (greater than) or ge (greater than or equal to)
return CACHE_ENABLED && (timeFilter == null || timeFilter instanceof GtEq)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 8afd9b99de..7315c81f46 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -20,46 +20,24 @@ package org.apache.iotdb.db.mpp.execution.operator.process.last;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
-import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.ListenableFuture;
-
import static com.google.common.base.Preconditions.checkArgument;
-public class UpdateLastCacheOperator implements ProcessOperator {
-
- private static final TsBlock LAST_QUERY_EMPTY_TSBLOCK =
- new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT))
- .build();
-
- private final OperatorContext operatorContext;
-
- private final Operator child;
+public class UpdateLastCacheOperator extends AbstractUpdateLastCacheOperator {
// fullPath for queried time series
// It should be exact PartialPath, neither MeasurementPath nor AlignedPath, because lastCache only
// accept PartialPath
- private final MeasurementPath fullPath;
+ private MeasurementPath fullPath;
// dataType for queried time series;
- private final String dataType;
-
- private final DataNodeSchemaCache lastCache;
-
- private final boolean needUpdateCache;
-
- private final TsBlockBuilder tsBlockBuilder;
-
- private String databaseName;
+ private String dataType;
public UpdateLastCacheOperator(
OperatorContext operatorContext,
@@ -68,23 +46,9 @@ public class UpdateLastCacheOperator implements ProcessOperator {
TSDataType dataType,
DataNodeSchemaCache dataNodeSchemaCache,
boolean needUpdateCache) {
- this.operatorContext = operatorContext;
- this.child = child;
+ super(operatorContext, child, dataNodeSchemaCache, needUpdateCache);
this.fullPath = fullPath;
this.dataType = dataType.name();
- this.lastCache = dataNodeSchemaCache;
- this.needUpdateCache = needUpdateCache;
- this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(1);
- }
-
- @Override
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- @Override
- public ListenableFuture<?> isBlocked() {
- return child.isBlocked();
}
@Override
@@ -119,44 +83,4 @@ public class UpdateLastCacheOperator implements ProcessOperator {
return tsBlockBuilder.build();
}
-
- private String getDatabaseName() {
- if (databaseName == null) {
- databaseName =
- ((DataDriverContext) operatorContext.getInstanceContext().getDriverContext())
- .getDataRegion()
- .getStorageGroupName();
- }
- return databaseName;
- }
-
- @Override
- public boolean hasNext() {
- return child.hasNext();
- }
-
- @Override
- public boolean isFinished() {
- return child.isFinished();
- }
-
- @Override
- public void close() throws Exception {
- child.close();
- }
-
- @Override
- public long calculateMaxPeekMemory() {
- return child.calculateMaxPeekMemory();
- }
-
- @Override
- public long calculateMaxReturnSize() {
- return child.calculateMaxReturnSize();
- }
-
- @Override
- public long calculateRetainedSizeAfterCallingNext() {
- return child.calculateRetainedSizeAfterCallingNext();
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 5d27adb255..aff917599b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -353,7 +353,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
.collect(Collectors.toCollection(LinkedHashSet::new));
}
- sourceExpressions.forEach(expression -> analyzeExpression(analysis, expression));
analysis.setSourceExpressions(sourceExpressions);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getLastQueryHeader());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 4033ddf52c..a04f877064 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -192,18 +192,22 @@ public class LogicalPlanBuilder {
Filter globalTimeFilter,
OrderByParameter mergeOrderParameter) {
List<PlanNode> sourceNodeList = new ArrayList<>();
- for (Expression sourceExpression : sourceExpressions) {
- MeasurementPath selectPath =
- (MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath();
- if (selectPath.isUnderAlignedEntity()) {
+ List<PartialPath> selectedPaths =
+ sourceExpressions.stream()
+ .map(expression -> ((TimeSeriesOperand) expression).getPath())
+ .collect(Collectors.toList());
+ List<PartialPath> groupedPaths = MetaUtils.groupAlignedSeries(selectedPaths);
+ for (PartialPath path : groupedPaths) {
+ if (path instanceof MeasurementPath) { // non-aligned series
+ sourceNodeList.add(
+ new LastQueryScanNode(context.getQueryId().genPlanNodeId(), (MeasurementPath) path));
+ } else if (path instanceof AlignedPath) { // aligned series
sourceNodeList.add(
- new AlignedLastQueryScanNode(
- context.getQueryId().genPlanNodeId(), new AlignedPath(selectPath)));
+ new AlignedLastQueryScanNode(context.getQueryId().genPlanNodeId(), (AlignedPath) path));
} else {
- sourceNodeList.add(new LastQueryScanNode(context.getQueryId().genPlanNodeId(), selectPath));
+ throw new IllegalArgumentException("unexpected path type");
}
}
- updateTypeProvider(sourceExpressions);
this.root =
new LastQueryNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 484a342715..aea6df9299 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -82,6 +82,8 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MultiColumn
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.AbstractUpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.AlignedUpdateLastCacheOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryCollectOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
@@ -1704,34 +1706,46 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
@Override
public Operator visitAlignedLastQueryScan(
AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
- PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
- TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
- if (timeValuePair == null) { // last value is not cached
- return createUpdateLastCacheOperator(
- node, context, node.getSeriesPath().getMeasurementPath());
- } else if (!LastQueryUtil.satisfyFilter(
- updateFilterUsingTTL(context.getLastQueryTimeFilter(), context.getDataRegionTTL()),
- timeValuePair)) { // cached last value is not satisfied
-
- boolean isFilterGtOrGe =
- (context.getLastQueryTimeFilter() instanceof Gt
- || context.getLastQueryTimeFilter() instanceof GtEq);
- // time filter is not > or >=, we still need to read from disk
- if (!isFilterGtOrGe) {
- return createUpdateLastCacheOperator(
- node, context, node.getSeriesPath().getMeasurementPath());
- } else { // otherwise, we just ignore it and return null
- return null;
+ AlignedPath alignedPath = node.getSeriesPath();
+ PartialPath devicePath = alignedPath.getDevicePath();
+ // get series under aligned entity that has not been cached
+ List<Integer> unCachedMeasurementIndexes = new ArrayList<>();
+ List<String> measurementList = alignedPath.getMeasurementList();
+ for (int i = 0; i < measurementList.size(); i++) {
+ PartialPath measurementPath = devicePath.concatNode(measurementList.get(i));
+ TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(measurementPath);
+ if (timeValuePair == null) { // last value is not cached
+ unCachedMeasurementIndexes.add(i);
+ } else if (!LastQueryUtil.satisfyFilter(
+ updateFilterUsingTTL(context.getLastQueryTimeFilter(), context.getDataRegionTTL()),
+ timeValuePair)) { // cached last value is not satisfied
+
+ boolean isFilterGtOrGe =
+ (context.getLastQueryTimeFilter() instanceof Gt
+ || context.getLastQueryTimeFilter() instanceof GtEq);
+ // time filter is not > or >=, we still need to read from disk
+ if (!isFilterGtOrGe) {
+ unCachedMeasurementIndexes.add(i);
+ }
+ } else { // cached last value is satisfied, put it into LastCacheScanOperator
+ context.addCachedLastValue(timeValuePair, measurementPath.getFullPath());
}
- } else { // cached last value is satisfied, put it into LastCacheScanOperator
- context.addCachedLastValue(timeValuePair, seriesPath.getFullPath());
+ }
+ if (unCachedMeasurementIndexes.isEmpty()) {
return null;
+ } else {
+ AlignedPath unCachedPath = new AlignedPath(alignedPath.getDevicePath());
+ for (int i : unCachedMeasurementIndexes) {
+ unCachedPath.addMeasurement(measurementList.get(i), alignedPath.getSchemaList().get(i));
+ }
+ return createAlignedUpdateLastCacheOperator(node, unCachedPath, context);
}
}
- private UpdateLastCacheOperator createUpdateLastCacheOperator(
- AlignedLastQueryScanNode node, LocalExecutionPlanContext context, MeasurementPath fullPath) {
- AlignedSeriesAggregationScanOperator lastQueryScan = createLastQueryScanOperator(node, context);
+ private AlignedUpdateLastCacheOperator createAlignedUpdateLastCacheOperator(
+ AlignedLastQueryScanNode node, AlignedPath unCachedPath, LocalExecutionPlanContext context) {
+ AlignedSeriesAggregationScanOperator lastQueryScan =
+ createLastQueryScanOperator(node, unCachedPath, context);
OperatorContext operatorContext =
context
@@ -1739,20 +1753,18 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
- UpdateLastCacheOperator.class.getSimpleName());
+ AlignedUpdateLastCacheOperator.class.getSimpleName());
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
- return new UpdateLastCacheOperator(
+ return new AlignedUpdateLastCacheOperator(
operatorContext,
lastQueryScan,
- fullPath,
- node.getSeriesPath().getSchemaList().get(0).getType(),
+ unCachedPath,
DATA_NODE_SCHEMA_CACHE,
context.isNeedUpdateLastCache());
}
private AlignedSeriesAggregationScanOperator createLastQueryScanOperator(
- AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
- AlignedPath seriesPath = node.getSeriesPath();
+ AlignedLastQueryScanNode node, AlignedPath unCachedPath, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getInstanceContext()
@@ -1762,17 +1774,18 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
AlignedSeriesAggregationScanOperator.class.getSimpleName());
// last_time, last_value
- List<Aggregator> aggregators =
- LastQueryUtil.createAggregators(seriesPath.getSchemaList().get(0).getType());
+ List<Aggregator> aggregators = new ArrayList<>();
+ for (int i = 0; i < unCachedPath.getMeasurementList().size(); i++) {
+ aggregators.addAll(
+ LastQueryUtil.createAggregators(unCachedPath.getSchemaList().get(i).getType(), i));
+ }
ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(null, false, false);
- long maxReturnSize =
- calculateMaxAggregationResultSizeForLastQuery(
- aggregators, seriesPath.transformToPartialPath());
+ long maxReturnSize = calculateMaxAggregationResultSizeForLastQuery(aggregators, unCachedPath);
AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
new AlignedSeriesAggregationScanOperator(
node.getPlanNodeId(),
- seriesPath,
+ unCachedPath,
operatorContext,
aggregators,
timeRangeIterator,
@@ -1781,7 +1794,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
null,
maxReturnSize);
context.addSourceOperator(seriesAggregationScanOperator);
- context.addPath(seriesPath);
+ context.addPath(unCachedPath);
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
return seriesAggregationScanOperator;
}
@@ -1798,11 +1811,11 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
context.setLastQueryTimeFilter(node.getTimeFilter());
context.setNeedUpdateLastCache(LastQueryUtil.needUpdateCache(node.getTimeFilter()));
- List<UpdateLastCacheOperator> operatorList =
+ List<AbstractUpdateLastCacheOperator> operatorList =
node.getChildren().stream()
.map(child -> child.accept(this, context))
.filter(Objects::nonNull)
- .map(o -> (UpdateLastCacheOperator) o)
+ .map(o -> (AbstractUpdateLastCacheOperator) o)
.collect(Collectors.toList());
List<Pair<TimeValuePair, Binary>> cachedLastValueAndPathList =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
index a95adc10c8..38d46ad5e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
@@ -27,7 +27,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
@@ -105,6 +110,30 @@ public class SubPlanTypeExtractor {
return visitPlan(node, context);
}
+ // region PlanNode of last query
+ // No need to deal with type of last query
+ public Void visitLastQueryScan(LastQueryScanNode node, Void context) {
+ return null;
+ }
+
+ public Void visitAlignedLastQueryScan(AlignedLastQueryScanNode node, Void context) {
+ return null;
+ }
+
+ public Void visitLastQuery(LastQueryNode node, Void context) {
+ return null;
+ }
+
+ public Void visitLastQueryMerge(LastQueryMergeNode node, Void context) {
+ return null;
+ }
+
+ public Void visitLastQueryCollect(LastQueryCollectNode node, Void context) {
+ return null;
+ }
+
+ // end region PlanNode of last query
+
private void updateTypeProviderByAggregationDescriptor(
List<? extends AggregationDescriptor> aggregationDescriptorList) {
aggregationDescriptorList.stream()
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index 7633beedd9..585ed15f43 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill
import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.AbstractUpdateLastCacheOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryCollectOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
@@ -325,7 +326,7 @@ public class OperatorMemoryTest {
public void lastQueryOperatorTest() {
TsBlockBuilder builder = Mockito.mock(TsBlockBuilder.class);
Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(1024L);
- List<UpdateLastCacheOperator> children = new ArrayList<>(4);
+ List<AbstractUpdateLastCacheOperator> children = new ArrayList<>(4);
long expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
for (int i = 0; i < 4; i++) {
UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);
@@ -355,7 +356,7 @@ public class OperatorMemoryTest {
TsBlock tsBlock = Mockito.mock(TsBlock.class);
Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(16 * 1024L);
Mockito.when(tsBlock.getPositionCount()).thenReturn(16);
- List<UpdateLastCacheOperator> children = new ArrayList<>(4);
+ List<AbstractUpdateLastCacheOperator> children = new ArrayList<>(4);
for (int i = 0; i < 4; i++) {
UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);