You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/23 08:55:08 UTC

[iotdb] 02/02: contruct operators related to last query in LocalExecutionPlanner

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 426bd83261b18027bf4d2134d73b8dd2746b9582
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon May 23 16:54:59 2022 +0800

    contruct operators related to last query in LocalExecutionPlanner
---
 .../db/mpp/execution/operator/LastQueryUtil.java   |  78 +++++++
 .../operator/process/LastQueryMergeOperator.java   |  11 +-
 .../operator/process/UpdateLastCacheOperator.java  |  16 +-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java | 237 ++++++++++++++++++++-
 .../plan/node/process/LastQueryMergeNode.java      |  32 ++-
 .../plan/node/source/LastQueryScanNode.java        |  42 +---
 .../iotdb/db/query/executor/LastQueryExecutor.java |   5 +-
 7 files changed, 361 insertions(+), 60 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
new file mode 100644
index 0000000000..57f1f083a3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.LastValueDescAccumulator;
+import org.apache.iotdb.db.mpp.aggregation.MaxTimeDescAccumulator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.Gt;
+import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LastQueryUtil {
+
+  public static TsBlockBuilder createTsBlockBuilder() {
+    return new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+  }
+
+  public static TsBlockBuilder createTsBlockBuilder(int initialExpectedEntries) {
+    return new TsBlockBuilder(
+        initialExpectedEntries,
+        ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+  }
+
+  public static void appendLastValue(
+      TsBlockBuilder builder, long lastTime, String fullPath, String lastValue, String dataType) {
+    // Time
+    builder.getTimeColumnBuilder().writeLong(lastTime);
+    // timeseries
+    builder.getColumnBuilder(0).writeBinary(new Binary(fullPath));
+    // value
+    builder.getColumnBuilder(1).writeBinary(new Binary(lastValue));
+    // dataType
+    builder.getColumnBuilder(2).writeBinary(new Binary(dataType));
+  }
+
+  public static boolean satisfyFilter(Filter filter, TimeValuePair tvPair) {
+    return filter == null || filter.satisfy(tvPair.getTimestamp(), tvPair.getValue().getValue());
+  }
+
+  public static List<Aggregator> createAggregators(TSDataType dataType) {
+    // max_time, last_value
+    List<Aggregator> aggregators = new ArrayList<>(2);
+    aggregators.add(new Aggregator(new MaxTimeDescAccumulator(), AggregationStep.SINGLE));
+    aggregators.add(new Aggregator(new LastValueDescAccumulator(dataType), AggregationStep.SINGLE));
+    return aggregators;
+  }
+
+  public static boolean needUpdateCache(Filter timeFilter) {
+    // Update the cache only when, the filter is gt (greater than) or ge (greater than or equal to)
+    return (timeFilter instanceof GtEq) || (timeFilter instanceof Gt);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
index c7e678c884..261abdeb12 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
@@ -55,17 +55,22 @@ public class LastQueryMergeOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
-    return children.get(currentIndex++).next();
+    if (children.get(currentIndex).hasNext()) {
+      return children.get(currentIndex).next();
+    } else {
+      currentIndex++;
+      return null;
+    }
   }
 
   @Override
   public boolean hasNext() {
-    return currentIndex < inputOperatorsCount && children.get(currentIndex).hasNext();
+    return currentIndex < inputOperatorsCount;
   }
 
   @Override
   public boolean isFinished() {
-    return currentIndex >= inputOperatorsCount;
+    return !hasNext();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
index 1daab0f0f9..ee0eb898a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
@@ -20,13 +20,13 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import com.google.common.collect.ImmutableList;
@@ -69,8 +69,7 @@ public class UpdateLastCacheOperator implements ProcessOperator {
     this.dataType = dataType.name();
     this.lastCache = dataNodeSchemaCache;
     this.needUpdateCache = needUpdateCache;
-    this.tsBlockBuilder =
-        new TsBlockBuilder(1, ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+    this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(1);
   }
 
   @Override
@@ -104,14 +103,9 @@ public class UpdateLastCacheOperator implements ProcessOperator {
     }
 
     tsBlockBuilder.reset();
-    // Time
-    tsBlockBuilder.getTimeColumnBuilder().writeLong(lastTime);
-    // timeseries
-    tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(fullPath.getFullPath()));
-    // value
-    tsBlockBuilder.getColumnBuilder(1).writeBinary(new Binary(lastValue.getStringValue()));
-    // dataType
-    tsBlockBuilder.getColumnBuilder(2).writeBinary(new Binary(dataType));
+
+    LastQueryUtil.appendLastValue(
+        tsBlockBuilder, lastTime, fullPath.getFullPath(), lastValue.getStringValue(), dataType);
 
     return tsBlockBuilder.build();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 34f18ad5e9..5844f784d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.mpp.plan.planner;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
@@ -35,6 +37,7 @@ import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
@@ -42,6 +45,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FilterOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.LastQueryMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
@@ -49,6 +53,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BooleanConstantFill;
@@ -90,11 +95,13 @@ import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregatio
 import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildNodesSchemaScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildPathsSchemaScanNode;
@@ -117,14 +124,17 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
@@ -136,7 +146,11 @@ import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
 import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.Gt;
+import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
 
 import org.apache.commons.lang3.Validate;
 
@@ -148,11 +162,13 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.satisfyFilter;
 
 /**
  * Used to plan a fragment instance. Currently, we simply change it from PlanNode to executable
@@ -164,6 +180,9 @@ public class LocalExecutionPlanner {
   private static final DataBlockManager DATA_BLOCK_MANAGER =
       DataBlockService.getInstance().getDataBlockManager();
 
+  private static final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE =
+      DataNodeSchemaCache.getInstance();
+
   private static final TimeComparator ASC_TIME_COMPARATOR = new AscTimeComparator();
 
   private static final TimeComparator DESC_TIME_COMPARATOR = new DescTimeComparator();
@@ -485,7 +504,7 @@ public class LocalExecutionPlanner {
           context.instanceContext.addOperatorContext(
               context.getNextOperatorId(),
               node.getPlanNodeId(),
-              SeriesAggregationScanNode.class.getSimpleName());
+              SeriesAggregationScanOperator.class.getSimpleName());
 
       List<Aggregator> aggregators = new ArrayList<>();
       node.getAggregationDescriptorList()
@@ -972,6 +991,187 @@ public class LocalExecutionPlanner {
           ((SchemaDriverContext) (context.instanceContext.getDriverContext())).getSchemaRegion());
     }
 
+    @Override
+    public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanContext context) {
+      TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(node.getSeriesPath());
+      if (timeValuePair == null) { // last value is not cached
+        return createUpdateLastCacheOperator(node, context);
+      } else if (!satisfyFilter(
+          context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
+
+        boolean isFilterGtOrGe =
+            (context.lastQueryTimeFilter instanceof Gt
+                || context.lastQueryTimeFilter instanceof GtEq);
+        // time filter is not > or >=, we still need to read from disk
+        if (!isFilterGtOrGe) {
+          return createUpdateLastCacheOperator(node, context);
+        } else { // otherwise, we just ignore it and return null
+          return null;
+        }
+      } else { //  cached last value is satisfied, put it into LastCacheScanOperator
+        context.addCachedLastValue(
+            timeValuePair, node.getPlanNodeId(), node.getSeriesPath().getFullPath());
+        return null;
+      }
+    }
+
+    private UpdateLastCacheOperator createUpdateLastCacheOperator(
+        LastQueryScanNode node, LocalExecutionPlanContext context) {
+      SeriesAggregationScanOperator lastQueryScan = createLastQueryScanOperator(node, context);
+
+      return new UpdateLastCacheOperator(
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              UpdateLastCacheOperator.class.getSimpleName()),
+          lastQueryScan,
+          node.getSeriesPath(),
+          node.getSeriesPath().getSeriesType(),
+          DATA_NODE_SCHEMA_CACHE,
+          context.needUpdateLastCache);
+    }
+
+    private SeriesAggregationScanOperator createLastQueryScanOperator(
+        LastQueryScanNode node, LocalExecutionPlanContext context) {
+      MeasurementPath seriesPath = node.getSeriesPath();
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              SeriesAggregationScanOperator.class.getSimpleName());
+
+      // last_time, last_value
+      List<Aggregator> aggregators = LastQueryUtil.createAggregators(seriesPath.getSeriesType());
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator =
+          new SeriesAggregationScanOperator(
+              node.getPlanNodeId(),
+              seriesPath,
+              context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()),
+              operatorContext,
+              aggregators,
+              context.lastQueryTimeFilter,
+              false,
+              null);
+      context.addSourceOperator(seriesAggregationScanOperator);
+      context.addPath(seriesPath);
+      return seriesAggregationScanOperator;
+    }
+
+    @Override
+    public Operator visitAlignedLastQueryScan(
+        AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+      TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(node.getSeriesPath());
+      if (timeValuePair == null) { // last value is not cached
+        return createUpdateLastCacheOperator(node, context);
+      } else if (!satisfyFilter(
+          context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
+
+        boolean isFilterGtOrGe =
+            (context.lastQueryTimeFilter instanceof Gt
+                || context.lastQueryTimeFilter instanceof GtEq);
+        // time filter is not > or >=, we still need to read from disk
+        if (!isFilterGtOrGe) {
+          return createUpdateLastCacheOperator(node, context);
+        } else { // otherwise, we just ignore it and return null
+          return null;
+        }
+      } else { //  cached last value is satisfied, put it into LastCacheScanOperator
+        context.addCachedLastValue(
+            timeValuePair, node.getPlanNodeId(), node.getSeriesPath().getFullPath());
+        return null;
+      }
+    }
+
+    private UpdateLastCacheOperator createUpdateLastCacheOperator(
+        AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+      AlignedSeriesAggregationScanOperator lastQueryScan =
+          createLastQueryScanOperator(node, context);
+
+      return new UpdateLastCacheOperator(
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              UpdateLastCacheOperator.class.getSimpleName()),
+          lastQueryScan,
+          node.getSeriesPath(),
+          node.getSeriesPath().getSeriesType(),
+          DATA_NODE_SCHEMA_CACHE,
+          context.needUpdateLastCache);
+    }
+
+    private AlignedSeriesAggregationScanOperator createLastQueryScanOperator(
+        AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+      AlignedPath seriesPath = node.getSeriesPath();
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              AlignedSeriesAggregationScanOperator.class.getSimpleName());
+
+      // last_time, last_value
+      List<Aggregator> aggregators = LastQueryUtil.createAggregators(seriesPath.getSeriesType());
+      AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
+          new AlignedSeriesAggregationScanOperator(
+              node.getPlanNodeId(),
+              seriesPath,
+              operatorContext,
+              aggregators,
+              context.lastQueryTimeFilter,
+              false,
+              null);
+      context.addSourceOperator(seriesAggregationScanOperator);
+      context.addPath(seriesPath);
+      return seriesAggregationScanOperator;
+    }
+
+    @Override
+    public Operator visitLastQueryMerge(
+        LastQueryMergeNode node, LocalExecutionPlanContext context) {
+
+      context.setLastQueryTimeFilter(node.getTimeFilter());
+      context.setNeedUpdateLastCache(LastQueryUtil.needUpdateCache(node.getTimeFilter()));
+
+      List<Operator> operatorList =
+          node.getChildren().stream()
+              .map(child -> child.accept(this, context))
+              .filter(Objects::nonNull)
+              .collect(Collectors.toList());
+
+      List<TimeValuePair> cachedLastValueList = context.getCachedLastValueList();
+
+      if (cachedLastValueList != null && !cachedLastValueList.isEmpty()) {
+        TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(cachedLastValueList.size());
+        for (int i = 0; i < cachedLastValueList.size(); i++) {
+          TimeValuePair timeValuePair = cachedLastValueList.get(i);
+          String fullPath = context.cachedLastValuePathList.get(i);
+          LastQueryUtil.appendLastValue(
+              builder,
+              timeValuePair.getTimestamp(),
+              fullPath,
+              timeValuePair.getValue().getStringValue(),
+              timeValuePair.getValue().getDataType().name());
+        }
+
+        LastCacheScanOperator operator =
+            new LastCacheScanOperator(
+                context.instanceContext.addOperatorContext(
+                    context.getNextOperatorId(),
+                    context.firstCachedPlanNodeId,
+                    LastCacheScanOperator.class.getSimpleName()),
+                context.firstCachedPlanNodeId,
+                builder.build());
+        operatorList.add(operator);
+      }
+
+      return new LastQueryMergeOperator(
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              LastQueryMergeOperator.class.getSimpleName()),
+          operatorList);
+    }
+
     private Map<String, List<InputLocation>> makeLayout(PlanNode node) {
       Map<String, List<InputLocation>> outputMappings = new LinkedHashMap<>();
       int tsBlockIndex = 0;
@@ -1032,6 +1232,18 @@ public class LocalExecutionPlanner {
 
     private TypeProvider typeProvider;
 
+    // cached last value in last query
+    private List<TimeValuePair> cachedLastValueList;
+    // full path for each cached last value, this size should be equal to cachedLastValueList
+    private List<String> cachedLastValuePathList;
+    // PlanNodeId of first LastQueryScanNode/AlignedLastQueryScanNode, it's used for sourceId of
+    // LastCachedScanOperator
+    private PlanNodeId firstCachedPlanNodeId;
+    // timeFilter for last query
+    private Filter lastQueryTimeFilter;
+    // whether we need to update last cache
+    private boolean needUpdateLastCache;
+
     public LocalExecutionPlanContext(
         TypeProvider typeProvider, FragmentInstanceContext instanceContext) {
       this.typeProvider = typeProvider;
@@ -1074,6 +1286,29 @@ public class LocalExecutionPlanner {
       sourceOperators.add(sourceOperator);
     }
 
+    public void setLastQueryTimeFilter(Filter lastQueryTimeFilter) {
+      this.lastQueryTimeFilter = lastQueryTimeFilter;
+    }
+
+    public void setNeedUpdateLastCache(boolean needUpdateLastCache) {
+      this.needUpdateLastCache = needUpdateLastCache;
+    }
+
+    public void addCachedLastValue(
+        TimeValuePair timeValuePair, PlanNodeId planNodeId, String fullPath) {
+      if (cachedLastValueList == null) {
+        cachedLastValueList = new ArrayList<>();
+        cachedLastValuePathList = new ArrayList<>();
+        firstCachedPlanNodeId = planNodeId;
+      }
+      cachedLastValueList.add(timeValuePair);
+      cachedLastValuePathList.add(fullPath);
+    }
+
+    public List<TimeValuePair> getCachedLastValueList() {
+      return cachedLastValueList;
+    }
+
     public ISinkHandle getSinkHandle() {
       return sinkHandle;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
index e8223a42d0..5923bc8942 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
@@ -22,6 +22,11 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.Nullable;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -35,14 +40,18 @@ public class LastQueryMergeNode extends ProcessNode {
   // make sure child in list has been ordered by their sensor name
   private List<PlanNode> children;
 
-  public LastQueryMergeNode(PlanNodeId id) {
+  private final Filter timeFilter;
+
+  public LastQueryMergeNode(PlanNodeId id, Filter timeFilter) {
     super(id);
     this.children = new ArrayList<>();
+    this.timeFilter = timeFilter;
   }
 
-  public LastQueryMergeNode(PlanNodeId id, List<PlanNode> children) {
+  public LastQueryMergeNode(PlanNodeId id, List<PlanNode> children, Filter timeFilter) {
     super(id);
     this.children = children;
+    this.timeFilter = timeFilter;
   }
 
   @Override
@@ -57,7 +66,7 @@ public class LastQueryMergeNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new LastQueryMergeNode(getPlanNodeId());
+    return new LastQueryMergeNode(getPlanNodeId(), timeFilter);
   }
 
   @Override
@@ -97,14 +106,29 @@ public class LastQueryMergeNode extends ProcessNode {
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.LAST_QUERY_MERGE.serialize(byteBuffer);
+    if (timeFilter == null) {
+      ReadWriteIOUtils.write((byte) 0, byteBuffer);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, byteBuffer);
+      timeFilter.serialize(byteBuffer);
+    }
   }
 
   public static LastQueryMergeNode deserialize(ByteBuffer byteBuffer) {
+    Filter timeFilter = null;
+    if (!ReadWriteIOUtils.readIsNull(byteBuffer)) {
+      timeFilter = FilterFactory.deserialize(byteBuffer);
+    }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new LastQueryMergeNode(planNodeId);
+    return new LastQueryMergeNode(planNodeId, timeFilter);
   }
 
   public void setChildren(List<PlanNode> children) {
     this.children = children;
   }
+
+  @Nullable
+  public Filter getTimeFilter() {
+    return timeFilter;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
index 546c357e82..ab20f9548c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
@@ -25,14 +25,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import com.google.common.collect.ImmutableList;
 
-import javax.annotation.Nullable;
-
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
@@ -45,25 +40,18 @@ public class LastQueryScanNode extends SourceNode {
   // The path of the target series which will be scanned.
   private final MeasurementPath seriesPath;
 
-  private Filter timeFilter;
-
   // The id of DataRegion where the node will run
   private TRegionReplicaSet regionReplicaSet;
 
-  public LastQueryScanNode(PlanNodeId id, MeasurementPath seriesPath, Filter timeFilter) {
+  public LastQueryScanNode(PlanNodeId id, MeasurementPath seriesPath) {
     super(id);
     this.seriesPath = seriesPath;
-    this.timeFilter = timeFilter;
   }
 
   public LastQueryScanNode(
-      PlanNodeId id,
-      MeasurementPath seriesPath,
-      Filter timeFilter,
-      TRegionReplicaSet regionReplicaSet) {
+      PlanNodeId id, MeasurementPath seriesPath, TRegionReplicaSet regionReplicaSet) {
     super(id);
     this.seriesPath = seriesPath;
-    this.timeFilter = timeFilter;
     this.regionReplicaSet = regionReplicaSet;
   }
 
@@ -84,15 +72,6 @@ public class LastQueryScanNode extends SourceNode {
     return seriesPath;
   }
 
-  @Nullable
-  public Filter getTimeFilter() {
-    return timeFilter;
-  }
-
-  public void setTimeFilter(@Nullable Filter timeFilter) {
-    this.timeFilter = timeFilter;
-  }
-
   @Override
   public void close() throws Exception {}
 
@@ -108,7 +87,7 @@ public class LastQueryScanNode extends SourceNode {
 
   @Override
   public PlanNode clone() {
-    return new LastQueryScanNode(getPlanNodeId(), seriesPath, timeFilter, regionReplicaSet);
+    return new LastQueryScanNode(getPlanNodeId(), seriesPath, regionReplicaSet);
   }
 
   @Override
@@ -133,13 +112,12 @@ public class LastQueryScanNode extends SourceNode {
     if (!super.equals(o)) return false;
     LastQueryScanNode that = (LastQueryScanNode) o;
     return Objects.equals(seriesPath, that.seriesPath)
-        && Objects.equals(timeFilter, that.timeFilter)
         && Objects.equals(regionReplicaSet, that.regionReplicaSet);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), seriesPath, timeFilter, regionReplicaSet);
+    return Objects.hash(super.hashCode(), seriesPath, regionReplicaSet);
   }
 
   @Override
@@ -153,21 +131,11 @@ public class LastQueryScanNode extends SourceNode {
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.LAST_QUERY_SCAN.serialize(byteBuffer);
     seriesPath.serialize(byteBuffer);
-    if (timeFilter == null) {
-      ReadWriteIOUtils.write((byte) 0, byteBuffer);
-    } else {
-      ReadWriteIOUtils.write((byte) 1, byteBuffer);
-      timeFilter.serialize(byteBuffer);
-    }
   }
 
   public static LastQueryScanNode deserialize(ByteBuffer byteBuffer) {
     MeasurementPath partialPath = (MeasurementPath) PathDeserializeUtil.deserialize(byteBuffer);
-    Filter timeFilter = null;
-    if (!ReadWriteIOUtils.readIsNull(byteBuffer)) {
-      timeFilter = FilterFactory.deserialize(byteBuffer);
-    }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new LastQueryScanNode(planNodeId, partialPath, timeFilter);
+    return new LastQueryScanNode(planNodeId, partialPath);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index 62773b5ed9..af9526b142 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -67,6 +67,7 @@ import java.util.stream.Collectors;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_VALUE;
+import static org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.satisfyFilter;
 
 public class LastQueryExecutor {
 
@@ -379,10 +380,6 @@ public class LastQueryExecutor {
     }
   }
 
-  private static boolean satisfyFilter(Filter filter, TimeValuePair tvPair) {
-    return filter == null || filter.satisfy(tvPair.getTimestamp(), tvPair.getValue().getValue());
-  }
-
   public static void clear() {
     ID_TABLE_ENABLED = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
   }