You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/23 08:55:08 UTC
[iotdb] 02/02: contruct operators related to last query in LocalExecutionPlanner
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 426bd83261b18027bf4d2134d73b8dd2746b9582
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon May 23 16:54:59 2022 +0800
contruct operators related to last query in LocalExecutionPlanner
---
.../db/mpp/execution/operator/LastQueryUtil.java | 78 +++++++
.../operator/process/LastQueryMergeOperator.java | 11 +-
.../operator/process/UpdateLastCacheOperator.java | 16 +-
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 237 ++++++++++++++++++++-
.../plan/node/process/LastQueryMergeNode.java | 32 ++-
.../plan/node/source/LastQueryScanNode.java | 42 +---
.../iotdb/db/query/executor/LastQueryExecutor.java | 5 +-
7 files changed, 361 insertions(+), 60 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
new file mode 100644
index 0000000000..57f1f083a3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.LastValueDescAccumulator;
+import org.apache.iotdb.db.mpp.aggregation.MaxTimeDescAccumulator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.Gt;
+import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LastQueryUtil {
+
+ public static TsBlockBuilder createTsBlockBuilder() {
+ return new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+ }
+
+ public static TsBlockBuilder createTsBlockBuilder(int initialExpectedEntries) {
+ return new TsBlockBuilder(
+ initialExpectedEntries,
+ ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+ }
+
+ public static void appendLastValue(
+ TsBlockBuilder builder, long lastTime, String fullPath, String lastValue, String dataType) {
+ // Time
+ builder.getTimeColumnBuilder().writeLong(lastTime);
+ // timeseries
+ builder.getColumnBuilder(0).writeBinary(new Binary(fullPath));
+ // value
+ builder.getColumnBuilder(1).writeBinary(new Binary(lastValue));
+ // dataType
+ builder.getColumnBuilder(2).writeBinary(new Binary(dataType));
+ }
+
+ public static boolean satisfyFilter(Filter filter, TimeValuePair tvPair) {
+ return filter == null || filter.satisfy(tvPair.getTimestamp(), tvPair.getValue().getValue());
+ }
+
+ public static List<Aggregator> createAggregators(TSDataType dataType) {
+ // max_time, last_value
+ List<Aggregator> aggregators = new ArrayList<>(2);
+ aggregators.add(new Aggregator(new MaxTimeDescAccumulator(), AggregationStep.SINGLE));
+ aggregators.add(new Aggregator(new LastValueDescAccumulator(dataType), AggregationStep.SINGLE));
+ return aggregators;
+ }
+
+ public static boolean needUpdateCache(Filter timeFilter) {
+ // Update the cache only when, the filter is gt (greater than) or ge (greater than or equal to)
+ return (timeFilter instanceof GtEq) || (timeFilter instanceof Gt);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
index c7e678c884..261abdeb12 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
@@ -55,17 +55,22 @@ public class LastQueryMergeOperator implements ProcessOperator {
@Override
public TsBlock next() {
- return children.get(currentIndex++).next();
+ if (children.get(currentIndex).hasNext()) {
+ return children.get(currentIndex).next();
+ } else {
+ currentIndex++;
+ return null;
+ }
}
@Override
public boolean hasNext() {
- return currentIndex < inputOperatorsCount && children.get(currentIndex).hasNext();
+ return currentIndex < inputOperatorsCount;
}
@Override
public boolean isFinished() {
- return currentIndex >= inputOperatorsCount;
+ return !hasNext();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
index 1daab0f0f9..ee0eb898a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
@@ -20,13 +20,13 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import com.google.common.collect.ImmutableList;
@@ -69,8 +69,7 @@ public class UpdateLastCacheOperator implements ProcessOperator {
this.dataType = dataType.name();
this.lastCache = dataNodeSchemaCache;
this.needUpdateCache = needUpdateCache;
- this.tsBlockBuilder =
- new TsBlockBuilder(1, ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+ this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(1);
}
@Override
@@ -104,14 +103,9 @@ public class UpdateLastCacheOperator implements ProcessOperator {
}
tsBlockBuilder.reset();
- // Time
- tsBlockBuilder.getTimeColumnBuilder().writeLong(lastTime);
- // timeseries
- tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(fullPath.getFullPath()));
- // value
- tsBlockBuilder.getColumnBuilder(1).writeBinary(new Binary(lastValue.getStringValue()));
- // dataType
- tsBlockBuilder.getColumnBuilder(2).writeBinary(new Binary(dataType));
+
+ LastQueryUtil.appendLastValue(
+ tsBlockBuilder, lastTime, fullPath.getFullPath(), lastValue.getStringValue(), dataType);
return tsBlockBuilder.build();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 34f18ad5e9..5844f784d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.mpp.plan.planner;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
@@ -35,6 +37,7 @@ import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
@@ -42,6 +45,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FilterOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.LastQueryMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
@@ -49,6 +53,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BooleanConstantFill;
@@ -90,11 +95,13 @@ import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregatio
import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildNodesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildPathsSchemaScanNode;
@@ -117,14 +124,17 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
@@ -136,7 +146,11 @@ import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.Gt;
+import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
import org.apache.commons.lang3.Validate;
@@ -148,11 +162,13 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.satisfyFilter;
/**
* Used to plan a fragment instance. Currently, we simply change it from PlanNode to executable
@@ -164,6 +180,9 @@ public class LocalExecutionPlanner {
private static final DataBlockManager DATA_BLOCK_MANAGER =
DataBlockService.getInstance().getDataBlockManager();
+ private static final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE =
+ DataNodeSchemaCache.getInstance();
+
private static final TimeComparator ASC_TIME_COMPARATOR = new AscTimeComparator();
private static final TimeComparator DESC_TIME_COMPARATOR = new DescTimeComparator();
@@ -485,7 +504,7 @@ public class LocalExecutionPlanner {
context.instanceContext.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
- SeriesAggregationScanNode.class.getSimpleName());
+ SeriesAggregationScanOperator.class.getSimpleName());
List<Aggregator> aggregators = new ArrayList<>();
node.getAggregationDescriptorList()
@@ -972,6 +991,187 @@ public class LocalExecutionPlanner {
((SchemaDriverContext) (context.instanceContext.getDriverContext())).getSchemaRegion());
}
+ @Override
+ public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanContext context) {
+ TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(node.getSeriesPath());
+ if (timeValuePair == null) { // last value is not cached
+ return createUpdateLastCacheOperator(node, context);
+ } else if (!satisfyFilter(
+ context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
+
+ boolean isFilterGtOrGe =
+ (context.lastQueryTimeFilter instanceof Gt
+ || context.lastQueryTimeFilter instanceof GtEq);
+ // time filter is not > or >=, we still need to read from disk
+ if (!isFilterGtOrGe) {
+ return createUpdateLastCacheOperator(node, context);
+ } else { // otherwise, we just ignore it and return null
+ return null;
+ }
+ } else { // cached last value is satisfied, put it into LastCacheScanOperator
+ context.addCachedLastValue(
+ timeValuePair, node.getPlanNodeId(), node.getSeriesPath().getFullPath());
+ return null;
+ }
+ }
+
+ private UpdateLastCacheOperator createUpdateLastCacheOperator(
+ LastQueryScanNode node, LocalExecutionPlanContext context) {
+ SeriesAggregationScanOperator lastQueryScan = createLastQueryScanOperator(node, context);
+
+ return new UpdateLastCacheOperator(
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ UpdateLastCacheOperator.class.getSimpleName()),
+ lastQueryScan,
+ node.getSeriesPath(),
+ node.getSeriesPath().getSeriesType(),
+ DATA_NODE_SCHEMA_CACHE,
+ context.needUpdateLastCache);
+ }
+
+ private SeriesAggregationScanOperator createLastQueryScanOperator(
+ LastQueryScanNode node, LocalExecutionPlanContext context) {
+ MeasurementPath seriesPath = node.getSeriesPath();
+ OperatorContext operatorContext =
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ SeriesAggregationScanOperator.class.getSimpleName());
+
+ // last_time, last_value
+ List<Aggregator> aggregators = LastQueryUtil.createAggregators(seriesPath.getSeriesType());
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator =
+ new SeriesAggregationScanOperator(
+ node.getPlanNodeId(),
+ seriesPath,
+ context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()),
+ operatorContext,
+ aggregators,
+ context.lastQueryTimeFilter,
+ false,
+ null);
+ context.addSourceOperator(seriesAggregationScanOperator);
+ context.addPath(seriesPath);
+ return seriesAggregationScanOperator;
+ }
+
+ @Override
+ public Operator visitAlignedLastQueryScan(
+ AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+ TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(node.getSeriesPath());
+ if (timeValuePair == null) { // last value is not cached
+ return createUpdateLastCacheOperator(node, context);
+ } else if (!satisfyFilter(
+ context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
+
+ boolean isFilterGtOrGe =
+ (context.lastQueryTimeFilter instanceof Gt
+ || context.lastQueryTimeFilter instanceof GtEq);
+ // time filter is not > or >=, we still need to read from disk
+ if (!isFilterGtOrGe) {
+ return createUpdateLastCacheOperator(node, context);
+ } else { // otherwise, we just ignore it and return null
+ return null;
+ }
+ } else { // cached last value is satisfied, put it into LastCacheScanOperator
+ context.addCachedLastValue(
+ timeValuePair, node.getPlanNodeId(), node.getSeriesPath().getFullPath());
+ return null;
+ }
+ }
+
+ private UpdateLastCacheOperator createUpdateLastCacheOperator(
+ AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+ AlignedSeriesAggregationScanOperator lastQueryScan =
+ createLastQueryScanOperator(node, context);
+
+ return new UpdateLastCacheOperator(
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ UpdateLastCacheOperator.class.getSimpleName()),
+ lastQueryScan,
+ node.getSeriesPath(),
+ node.getSeriesPath().getSeriesType(),
+ DATA_NODE_SCHEMA_CACHE,
+ context.needUpdateLastCache);
+ }
+
+ private AlignedSeriesAggregationScanOperator createLastQueryScanOperator(
+ AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+ AlignedPath seriesPath = node.getSeriesPath();
+ OperatorContext operatorContext =
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ AlignedSeriesAggregationScanOperator.class.getSimpleName());
+
+ // last_time, last_value
+ List<Aggregator> aggregators = LastQueryUtil.createAggregators(seriesPath.getSeriesType());
+ AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
+ new AlignedSeriesAggregationScanOperator(
+ node.getPlanNodeId(),
+ seriesPath,
+ operatorContext,
+ aggregators,
+ context.lastQueryTimeFilter,
+ false,
+ null);
+ context.addSourceOperator(seriesAggregationScanOperator);
+ context.addPath(seriesPath);
+ return seriesAggregationScanOperator;
+ }
+
+ @Override
+ public Operator visitLastQueryMerge(
+ LastQueryMergeNode node, LocalExecutionPlanContext context) {
+
+ context.setLastQueryTimeFilter(node.getTimeFilter());
+ context.setNeedUpdateLastCache(LastQueryUtil.needUpdateCache(node.getTimeFilter()));
+
+ List<Operator> operatorList =
+ node.getChildren().stream()
+ .map(child -> child.accept(this, context))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+
+ List<TimeValuePair> cachedLastValueList = context.getCachedLastValueList();
+
+ if (cachedLastValueList != null && !cachedLastValueList.isEmpty()) {
+ TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(cachedLastValueList.size());
+ for (int i = 0; i < cachedLastValueList.size(); i++) {
+ TimeValuePair timeValuePair = cachedLastValueList.get(i);
+ String fullPath = context.cachedLastValuePathList.get(i);
+ LastQueryUtil.appendLastValue(
+ builder,
+ timeValuePair.getTimestamp(),
+ fullPath,
+ timeValuePair.getValue().getStringValue(),
+ timeValuePair.getValue().getDataType().name());
+ }
+
+ LastCacheScanOperator operator =
+ new LastCacheScanOperator(
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ context.firstCachedPlanNodeId,
+ LastCacheScanOperator.class.getSimpleName()),
+ context.firstCachedPlanNodeId,
+ builder.build());
+ operatorList.add(operator);
+ }
+
+ return new LastQueryMergeOperator(
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ LastQueryMergeOperator.class.getSimpleName()),
+ operatorList);
+ }
+
private Map<String, List<InputLocation>> makeLayout(PlanNode node) {
Map<String, List<InputLocation>> outputMappings = new LinkedHashMap<>();
int tsBlockIndex = 0;
@@ -1032,6 +1232,18 @@ public class LocalExecutionPlanner {
private TypeProvider typeProvider;
+ // cached last value in last query
+ private List<TimeValuePair> cachedLastValueList;
+ // full path for each cached last value, this size should be equal to cachedLastValueList
+ private List<String> cachedLastValuePathList;
+ // PlanNodeId of first LastQueryScanNode/AlignedLastQueryScanNode, it's used for sourceId of
+ // LastCachedScanOperator
+ private PlanNodeId firstCachedPlanNodeId;
+ // timeFilter for last query
+ private Filter lastQueryTimeFilter;
+ // whether we need to update last cache
+ private boolean needUpdateLastCache;
+
public LocalExecutionPlanContext(
TypeProvider typeProvider, FragmentInstanceContext instanceContext) {
this.typeProvider = typeProvider;
@@ -1074,6 +1286,29 @@ public class LocalExecutionPlanner {
sourceOperators.add(sourceOperator);
}
+ public void setLastQueryTimeFilter(Filter lastQueryTimeFilter) {
+ this.lastQueryTimeFilter = lastQueryTimeFilter;
+ }
+
+ public void setNeedUpdateLastCache(boolean needUpdateLastCache) {
+ this.needUpdateLastCache = needUpdateLastCache;
+ }
+
+ public void addCachedLastValue(
+ TimeValuePair timeValuePair, PlanNodeId planNodeId, String fullPath) {
+ if (cachedLastValueList == null) {
+ cachedLastValueList = new ArrayList<>();
+ cachedLastValuePathList = new ArrayList<>();
+ firstCachedPlanNodeId = planNodeId;
+ }
+ cachedLastValueList.add(timeValuePair);
+ cachedLastValuePathList.add(fullPath);
+ }
+
+ public List<TimeValuePair> getCachedLastValueList() {
+ return cachedLastValueList;
+ }
+
public ISinkHandle getSinkHandle() {
return sinkHandle;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
index e8223a42d0..5923bc8942 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
@@ -22,6 +22,11 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -35,14 +40,18 @@ public class LastQueryMergeNode extends ProcessNode {
// make sure child in list has been ordered by their sensor name
private List<PlanNode> children;
- public LastQueryMergeNode(PlanNodeId id) {
+ private final Filter timeFilter;
+
+ public LastQueryMergeNode(PlanNodeId id, Filter timeFilter) {
super(id);
this.children = new ArrayList<>();
+ this.timeFilter = timeFilter;
}
- public LastQueryMergeNode(PlanNodeId id, List<PlanNode> children) {
+ public LastQueryMergeNode(PlanNodeId id, List<PlanNode> children, Filter timeFilter) {
super(id);
this.children = children;
+ this.timeFilter = timeFilter;
}
@Override
@@ -57,7 +66,7 @@ public class LastQueryMergeNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new LastQueryMergeNode(getPlanNodeId());
+ return new LastQueryMergeNode(getPlanNodeId(), timeFilter);
}
@Override
@@ -97,14 +106,29 @@ public class LastQueryMergeNode extends ProcessNode {
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.LAST_QUERY_MERGE.serialize(byteBuffer);
+ if (timeFilter == null) {
+ ReadWriteIOUtils.write((byte) 0, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write((byte) 1, byteBuffer);
+ timeFilter.serialize(byteBuffer);
+ }
}
public static LastQueryMergeNode deserialize(ByteBuffer byteBuffer) {
+ Filter timeFilter = null;
+ if (!ReadWriteIOUtils.readIsNull(byteBuffer)) {
+ timeFilter = FilterFactory.deserialize(byteBuffer);
+ }
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new LastQueryMergeNode(planNodeId);
+ return new LastQueryMergeNode(planNodeId, timeFilter);
}
public void setChildren(List<PlanNode> children) {
this.children = children;
}
+
+ @Nullable
+ public Filter getTimeFilter() {
+ return timeFilter;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
index 546c357e82..ab20f9548c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
@@ -25,14 +25,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
-import javax.annotation.Nullable;
-
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
@@ -45,25 +40,18 @@ public class LastQueryScanNode extends SourceNode {
// The path of the target series which will be scanned.
private final MeasurementPath seriesPath;
- private Filter timeFilter;
-
// The id of DataRegion where the node will run
private TRegionReplicaSet regionReplicaSet;
- public LastQueryScanNode(PlanNodeId id, MeasurementPath seriesPath, Filter timeFilter) {
+ public LastQueryScanNode(PlanNodeId id, MeasurementPath seriesPath) {
super(id);
this.seriesPath = seriesPath;
- this.timeFilter = timeFilter;
}
public LastQueryScanNode(
- PlanNodeId id,
- MeasurementPath seriesPath,
- Filter timeFilter,
- TRegionReplicaSet regionReplicaSet) {
+ PlanNodeId id, MeasurementPath seriesPath, TRegionReplicaSet regionReplicaSet) {
super(id);
this.seriesPath = seriesPath;
- this.timeFilter = timeFilter;
this.regionReplicaSet = regionReplicaSet;
}
@@ -84,15 +72,6 @@ public class LastQueryScanNode extends SourceNode {
return seriesPath;
}
- @Nullable
- public Filter getTimeFilter() {
- return timeFilter;
- }
-
- public void setTimeFilter(@Nullable Filter timeFilter) {
- this.timeFilter = timeFilter;
- }
-
@Override
public void close() throws Exception {}
@@ -108,7 +87,7 @@ public class LastQueryScanNode extends SourceNode {
@Override
public PlanNode clone() {
- return new LastQueryScanNode(getPlanNodeId(), seriesPath, timeFilter, regionReplicaSet);
+ return new LastQueryScanNode(getPlanNodeId(), seriesPath, regionReplicaSet);
}
@Override
@@ -133,13 +112,12 @@ public class LastQueryScanNode extends SourceNode {
if (!super.equals(o)) return false;
LastQueryScanNode that = (LastQueryScanNode) o;
return Objects.equals(seriesPath, that.seriesPath)
- && Objects.equals(timeFilter, that.timeFilter)
&& Objects.equals(regionReplicaSet, that.regionReplicaSet);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), seriesPath, timeFilter, regionReplicaSet);
+ return Objects.hash(super.hashCode(), seriesPath, regionReplicaSet);
}
@Override
@@ -153,21 +131,11 @@ public class LastQueryScanNode extends SourceNode {
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.LAST_QUERY_SCAN.serialize(byteBuffer);
seriesPath.serialize(byteBuffer);
- if (timeFilter == null) {
- ReadWriteIOUtils.write((byte) 0, byteBuffer);
- } else {
- ReadWriteIOUtils.write((byte) 1, byteBuffer);
- timeFilter.serialize(byteBuffer);
- }
}
public static LastQueryScanNode deserialize(ByteBuffer byteBuffer) {
MeasurementPath partialPath = (MeasurementPath) PathDeserializeUtil.deserialize(byteBuffer);
- Filter timeFilter = null;
- if (!ReadWriteIOUtils.readIsNull(byteBuffer)) {
- timeFilter = FilterFactory.deserialize(byteBuffer);
- }
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new LastQueryScanNode(planNodeId, partialPath, timeFilter);
+ return new LastQueryScanNode(planNodeId, partialPath);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index 62773b5ed9..af9526b142 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -67,6 +67,7 @@ import java.util.stream.Collectors;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_VALUE;
+import static org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.satisfyFilter;
public class LastQueryExecutor {
@@ -379,10 +380,6 @@ public class LastQueryExecutor {
}
}
- private static boolean satisfyFilter(Filter filter, TimeValuePair tvPair) {
- return filter == null || filter.satisfy(tvPair.getTimestamp(), tvPair.getValue().getValue());
- }
-
public static void clear() {
ID_TABLE_ENABLED = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
}