You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/23 08:55:06 UTC

[iotdb] branch LastOperator updated (16e1902238 -> 426bd83261)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from 16e1902238 Operators related to last query
     add c053a2a766 [IOTDB-3069] unescape string in IoTDBSqlVisitor and ASTVisitor (#5952)
     add c6c4ba398d [IOTDB-3244] Add PlanNodes related to last query (#5987)
     new 8b9c8b0ebc Merge remote-tracking branch 'origin/master' into LastOperator
     new 426bd83261 contruct operators related to last query in LocalExecutionPlanner

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   6 +-
 .../iotdb/db/integration/IoTDBFuzzyQueryIT.java    |   4 +-
 .../IoTDBSyntaxConventionStringLiteralIT.java      |  12 +-
 .../db/integration/IoTDBUDTFBuiltinFunctionIT.java |   2 +-
 .../org/apache/iotdb/commons/path/PartialPath.java |   7 +
 .../db/mpp/execution/operator/LastQueryUtil.java   |  78 +++++++
 .../operator/process/LastQueryMergeOperator.java   |  11 +-
 .../operator/process/UpdateLastCacheOperator.java  |  16 +-
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  29 ++-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java | 237 ++++++++++++++++++++-
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |  14 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |  15 ++
 .../{TimeJoinNode.java => LastQueryMergeNode.java} | 113 +++++-----
 .../plan/node/source/AlignedLastQueryScanNode.java | 171 +++++++++++++++
 .../source/AlignedSeriesAggregationScanNode.java   |   6 +-
 .../plan/node/source/AlignedSeriesScanNode.java    |  13 +-
 .../plan/node/source/LastQueryScanNode.java        | 141 ++++++++++++
 .../node/source/SeriesAggregationScanNode.java     |   6 +-
 .../planner/plan/node/source/SeriesScanNode.java   |  13 +-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  29 ++-
 .../iotdb/db/query/executor/LastQueryExecutor.java |   5 +-
 .../mpp/plan/plan/FragmentInstanceSerdeTest.java   |   9 -
 .../node/process/AggregationNodeSerdeTest.java     |   6 +-
 .../node/process/GroupByLevelNodeSerdeTest.java    |  10 +-
 .../plan/plan/node/process/LimitNodeSerdeTest.java |   7 +-
 .../plan/node/process/OffsetNodeSerdeTest.java     |   7 +-
 .../plan/plan/node/process/SortNodeSerdeTest.java  |   7 +-
 .../plan/node/process/TimeJoinNodeSerdeTest.java   |  10 +-
 .../source/SeriesAggregationScanNodeSerdeTest.java |   6 +-
 .../plan/node/source/SeriesScanNodeSerdeTest.java  |   7 +-
 30 files changed, 805 insertions(+), 192 deletions(-)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
 copy server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/{TimeJoinNode.java => LastQueryMergeNode.java} (54%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java


[iotdb] 01/02: Merge remote-tracking branch 'origin/master' into LastOperator

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8b9c8b0ebcc09fcfe7097ea1dca42867b0343bc6
Merge: 16e1902238 c6c4ba398d
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon May 23 12:36:04 2022 +0800

    Merge remote-tracking branch 'origin/master' into LastOperator

 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   6 +-
 .../iotdb/db/integration/IoTDBFuzzyQueryIT.java    |   4 +-
 .../IoTDBSyntaxConventionStringLiteralIT.java      |  12 +-
 .../db/integration/IoTDBUDTFBuiltinFunctionIT.java |   2 +-
 .../org/apache/iotdb/commons/path/PartialPath.java |   7 +
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  29 +++-
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |  14 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |  15 ++
 .../plan/node/process/LastQueryMergeNode.java      | 110 +++++++++++++
 .../plan/node/source/AlignedLastQueryScanNode.java | 171 ++++++++++++++++++++
 .../source/AlignedSeriesAggregationScanNode.java   |   6 +-
 .../plan/node/source/AlignedSeriesScanNode.java    |  13 +-
 .../plan/node/source/LastQueryScanNode.java        | 173 +++++++++++++++++++++
 .../node/source/SeriesAggregationScanNode.java     |   6 +-
 .../planner/plan/node/source/SeriesScanNode.java   |  13 +-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  29 +++-
 .../mpp/plan/plan/FragmentInstanceSerdeTest.java   |   9 --
 .../node/process/AggregationNodeSerdeTest.java     |   6 +-
 .../node/process/GroupByLevelNodeSerdeTest.java    |  10 +-
 .../plan/plan/node/process/LimitNodeSerdeTest.java |   7 +-
 .../plan/node/process/OffsetNodeSerdeTest.java     |   7 +-
 .../plan/plan/node/process/SortNodeSerdeTest.java  |   7 +-
 .../plan/node/process/TimeJoinNodeSerdeTest.java   |  10 +-
 .../source/SeriesAggregationScanNodeSerdeTest.java |   6 +-
 .../plan/node/source/SeriesScanNodeSerdeTest.java  |   7 +-
 25 files changed, 563 insertions(+), 116 deletions(-)


[iotdb] 02/02: contruct operators related to last query in LocalExecutionPlanner

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 426bd83261b18027bf4d2134d73b8dd2746b9582
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon May 23 16:54:59 2022 +0800

    contruct operators related to last query in LocalExecutionPlanner
---
 .../db/mpp/execution/operator/LastQueryUtil.java   |  78 +++++++
 .../operator/process/LastQueryMergeOperator.java   |  11 +-
 .../operator/process/UpdateLastCacheOperator.java  |  16 +-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java | 237 ++++++++++++++++++++-
 .../plan/node/process/LastQueryMergeNode.java      |  32 ++-
 .../plan/node/source/LastQueryScanNode.java        |  42 +---
 .../iotdb/db/query/executor/LastQueryExecutor.java |   5 +-
 7 files changed, 361 insertions(+), 60 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
new file mode 100644
index 0000000000..57f1f083a3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.LastValueDescAccumulator;
+import org.apache.iotdb.db.mpp.aggregation.MaxTimeDescAccumulator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.Gt;
+import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LastQueryUtil {
+
+  public static TsBlockBuilder createTsBlockBuilder() {
+    return new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+  }
+
+  public static TsBlockBuilder createTsBlockBuilder(int initialExpectedEntries) {
+    return new TsBlockBuilder(
+        initialExpectedEntries,
+        ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+  }
+
+  public static void appendLastValue(
+      TsBlockBuilder builder, long lastTime, String fullPath, String lastValue, String dataType) {
+    // Time
+    builder.getTimeColumnBuilder().writeLong(lastTime);
+    // timeseries
+    builder.getColumnBuilder(0).writeBinary(new Binary(fullPath));
+    // value
+    builder.getColumnBuilder(1).writeBinary(new Binary(lastValue));
+    // dataType
+    builder.getColumnBuilder(2).writeBinary(new Binary(dataType));
+  }
+
+  public static boolean satisfyFilter(Filter filter, TimeValuePair tvPair) {
+    return filter == null || filter.satisfy(tvPair.getTimestamp(), tvPair.getValue().getValue());
+  }
+
+  public static List<Aggregator> createAggregators(TSDataType dataType) {
+    // max_time, last_value
+    List<Aggregator> aggregators = new ArrayList<>(2);
+    aggregators.add(new Aggregator(new MaxTimeDescAccumulator(), AggregationStep.SINGLE));
+    aggregators.add(new Aggregator(new LastValueDescAccumulator(dataType), AggregationStep.SINGLE));
+    return aggregators;
+  }
+
+  public static boolean needUpdateCache(Filter timeFilter) {
+    // Update the cache only when, the filter is gt (greater than) or ge (greater than or equal to)
+    return (timeFilter instanceof GtEq) || (timeFilter instanceof Gt);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
index c7e678c884..261abdeb12 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
@@ -55,17 +55,22 @@ public class LastQueryMergeOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
-    return children.get(currentIndex++).next();
+    if (children.get(currentIndex).hasNext()) {
+      return children.get(currentIndex).next();
+    } else {
+      currentIndex++;
+      return null;
+    }
   }
 
   @Override
   public boolean hasNext() {
-    return currentIndex < inputOperatorsCount && children.get(currentIndex).hasNext();
+    return currentIndex < inputOperatorsCount;
   }
 
   @Override
   public boolean isFinished() {
-    return currentIndex >= inputOperatorsCount;
+    return !hasNext();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
index 1daab0f0f9..ee0eb898a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
@@ -20,13 +20,13 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import com.google.common.collect.ImmutableList;
@@ -69,8 +69,7 @@ public class UpdateLastCacheOperator implements ProcessOperator {
     this.dataType = dataType.name();
     this.lastCache = dataNodeSchemaCache;
     this.needUpdateCache = needUpdateCache;
-    this.tsBlockBuilder =
-        new TsBlockBuilder(1, ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+    this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(1);
   }
 
   @Override
@@ -104,14 +103,9 @@ public class UpdateLastCacheOperator implements ProcessOperator {
     }
 
     tsBlockBuilder.reset();
-    // Time
-    tsBlockBuilder.getTimeColumnBuilder().writeLong(lastTime);
-    // timeseries
-    tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(fullPath.getFullPath()));
-    // value
-    tsBlockBuilder.getColumnBuilder(1).writeBinary(new Binary(lastValue.getStringValue()));
-    // dataType
-    tsBlockBuilder.getColumnBuilder(2).writeBinary(new Binary(dataType));
+
+    LastQueryUtil.appendLastValue(
+        tsBlockBuilder, lastTime, fullPath.getFullPath(), lastValue.getStringValue(), dataType);
 
     return tsBlockBuilder.build();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 34f18ad5e9..5844f784d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.mpp.plan.planner;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
@@ -35,6 +37,7 @@ import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
@@ -42,6 +45,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FilterOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.LastQueryMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
@@ -49,6 +53,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BooleanConstantFill;
@@ -90,11 +95,13 @@ import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregatio
 import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildNodesSchemaScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildPathsSchemaScanNode;
@@ -117,14 +124,17 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
@@ -136,7 +146,11 @@ import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
 import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.Gt;
+import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
 
 import org.apache.commons.lang3.Validate;
 
@@ -148,11 +162,13 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.satisfyFilter;
 
 /**
  * Used to plan a fragment instance. Currently, we simply change it from PlanNode to executable
@@ -164,6 +180,9 @@ public class LocalExecutionPlanner {
   private static final DataBlockManager DATA_BLOCK_MANAGER =
       DataBlockService.getInstance().getDataBlockManager();
 
+  private static final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE =
+      DataNodeSchemaCache.getInstance();
+
   private static final TimeComparator ASC_TIME_COMPARATOR = new AscTimeComparator();
 
   private static final TimeComparator DESC_TIME_COMPARATOR = new DescTimeComparator();
@@ -485,7 +504,7 @@ public class LocalExecutionPlanner {
           context.instanceContext.addOperatorContext(
               context.getNextOperatorId(),
               node.getPlanNodeId(),
-              SeriesAggregationScanNode.class.getSimpleName());
+              SeriesAggregationScanOperator.class.getSimpleName());
 
       List<Aggregator> aggregators = new ArrayList<>();
       node.getAggregationDescriptorList()
@@ -972,6 +991,187 @@ public class LocalExecutionPlanner {
           ((SchemaDriverContext) (context.instanceContext.getDriverContext())).getSchemaRegion());
     }
 
+    @Override
+    public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanContext context) {
+      TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(node.getSeriesPath());
+      if (timeValuePair == null) { // last value is not cached
+        return createUpdateLastCacheOperator(node, context);
+      } else if (!satisfyFilter(
+          context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
+
+        boolean isFilterGtOrGe =
+            (context.lastQueryTimeFilter instanceof Gt
+                || context.lastQueryTimeFilter instanceof GtEq);
+        // time filter is not > or >=, we still need to read from disk
+        if (!isFilterGtOrGe) {
+          return createUpdateLastCacheOperator(node, context);
+        } else { // otherwise, we just ignore it and return null
+          return null;
+        }
+      } else { //  cached last value is satisfied, put it into LastCacheScanOperator
+        context.addCachedLastValue(
+            timeValuePair, node.getPlanNodeId(), node.getSeriesPath().getFullPath());
+        return null;
+      }
+    }
+
+    private UpdateLastCacheOperator createUpdateLastCacheOperator(
+        LastQueryScanNode node, LocalExecutionPlanContext context) {
+      SeriesAggregationScanOperator lastQueryScan = createLastQueryScanOperator(node, context);
+
+      return new UpdateLastCacheOperator(
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              UpdateLastCacheOperator.class.getSimpleName()),
+          lastQueryScan,
+          node.getSeriesPath(),
+          node.getSeriesPath().getSeriesType(),
+          DATA_NODE_SCHEMA_CACHE,
+          context.needUpdateLastCache);
+    }
+
+    private SeriesAggregationScanOperator createLastQueryScanOperator(
+        LastQueryScanNode node, LocalExecutionPlanContext context) {
+      MeasurementPath seriesPath = node.getSeriesPath();
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              SeriesAggregationScanOperator.class.getSimpleName());
+
+      // last_time, last_value
+      List<Aggregator> aggregators = LastQueryUtil.createAggregators(seriesPath.getSeriesType());
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator =
+          new SeriesAggregationScanOperator(
+              node.getPlanNodeId(),
+              seriesPath,
+              context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()),
+              operatorContext,
+              aggregators,
+              context.lastQueryTimeFilter,
+              false,
+              null);
+      context.addSourceOperator(seriesAggregationScanOperator);
+      context.addPath(seriesPath);
+      return seriesAggregationScanOperator;
+    }
+
+    @Override
+    public Operator visitAlignedLastQueryScan(
+        AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+      TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(node.getSeriesPath());
+      if (timeValuePair == null) { // last value is not cached
+        return createUpdateLastCacheOperator(node, context);
+      } else if (!satisfyFilter(
+          context.lastQueryTimeFilter, timeValuePair)) { // cached last value is not satisfied
+
+        boolean isFilterGtOrGe =
+            (context.lastQueryTimeFilter instanceof Gt
+                || context.lastQueryTimeFilter instanceof GtEq);
+        // time filter is not > or >=, we still need to read from disk
+        if (!isFilterGtOrGe) {
+          return createUpdateLastCacheOperator(node, context);
+        } else { // otherwise, we just ignore it and return null
+          return null;
+        }
+      } else { //  cached last value is satisfied, put it into LastCacheScanOperator
+        context.addCachedLastValue(
+            timeValuePair, node.getPlanNodeId(), node.getSeriesPath().getFullPath());
+        return null;
+      }
+    }
+
+    private UpdateLastCacheOperator createUpdateLastCacheOperator(
+        AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+      AlignedSeriesAggregationScanOperator lastQueryScan =
+          createLastQueryScanOperator(node, context);
+
+      return new UpdateLastCacheOperator(
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              UpdateLastCacheOperator.class.getSimpleName()),
+          lastQueryScan,
+          node.getSeriesPath(),
+          node.getSeriesPath().getSeriesType(),
+          DATA_NODE_SCHEMA_CACHE,
+          context.needUpdateLastCache);
+    }
+
+    private AlignedSeriesAggregationScanOperator createLastQueryScanOperator(
+        AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+      AlignedPath seriesPath = node.getSeriesPath();
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              AlignedSeriesAggregationScanOperator.class.getSimpleName());
+
+      // last_time, last_value
+      List<Aggregator> aggregators = LastQueryUtil.createAggregators(seriesPath.getSeriesType());
+      AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
+          new AlignedSeriesAggregationScanOperator(
+              node.getPlanNodeId(),
+              seriesPath,
+              operatorContext,
+              aggregators,
+              context.lastQueryTimeFilter,
+              false,
+              null);
+      context.addSourceOperator(seriesAggregationScanOperator);
+      context.addPath(seriesPath);
+      return seriesAggregationScanOperator;
+    }
+
+    @Override
+    public Operator visitLastQueryMerge(
+        LastQueryMergeNode node, LocalExecutionPlanContext context) {
+
+      context.setLastQueryTimeFilter(node.getTimeFilter());
+      context.setNeedUpdateLastCache(LastQueryUtil.needUpdateCache(node.getTimeFilter()));
+
+      List<Operator> operatorList =
+          node.getChildren().stream()
+              .map(child -> child.accept(this, context))
+              .filter(Objects::nonNull)
+              .collect(Collectors.toList());
+
+      List<TimeValuePair> cachedLastValueList = context.getCachedLastValueList();
+
+      if (cachedLastValueList != null && !cachedLastValueList.isEmpty()) {
+        TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(cachedLastValueList.size());
+        for (int i = 0; i < cachedLastValueList.size(); i++) {
+          TimeValuePair timeValuePair = cachedLastValueList.get(i);
+          String fullPath = context.cachedLastValuePathList.get(i);
+          LastQueryUtil.appendLastValue(
+              builder,
+              timeValuePair.getTimestamp(),
+              fullPath,
+              timeValuePair.getValue().getStringValue(),
+              timeValuePair.getValue().getDataType().name());
+        }
+
+        LastCacheScanOperator operator =
+            new LastCacheScanOperator(
+                context.instanceContext.addOperatorContext(
+                    context.getNextOperatorId(),
+                    context.firstCachedPlanNodeId,
+                    LastCacheScanOperator.class.getSimpleName()),
+                context.firstCachedPlanNodeId,
+                builder.build());
+        operatorList.add(operator);
+      }
+
+      return new LastQueryMergeOperator(
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              LastQueryMergeOperator.class.getSimpleName()),
+          operatorList);
+    }
+
     private Map<String, List<InputLocation>> makeLayout(PlanNode node) {
       Map<String, List<InputLocation>> outputMappings = new LinkedHashMap<>();
       int tsBlockIndex = 0;
@@ -1032,6 +1232,18 @@ public class LocalExecutionPlanner {
 
     private TypeProvider typeProvider;
 
+    // cached last value in last query
+    private List<TimeValuePair> cachedLastValueList;
+    // full path for each cached last value, this size should be equal to cachedLastValueList
+    private List<String> cachedLastValuePathList;
+    // PlanNodeId of first LastQueryScanNode/AlignedLastQueryScanNode, it's used for sourceId of
+    // LastCachedScanOperator
+    private PlanNodeId firstCachedPlanNodeId;
+    // timeFilter for last query
+    private Filter lastQueryTimeFilter;
+    // whether we need to update last cache
+    private boolean needUpdateLastCache;
+
     public LocalExecutionPlanContext(
         TypeProvider typeProvider, FragmentInstanceContext instanceContext) {
       this.typeProvider = typeProvider;
@@ -1074,6 +1286,29 @@ public class LocalExecutionPlanner {
       sourceOperators.add(sourceOperator);
     }
 
+    public void setLastQueryTimeFilter(Filter lastQueryTimeFilter) {
+      this.lastQueryTimeFilter = lastQueryTimeFilter;
+    }
+
+    public void setNeedUpdateLastCache(boolean needUpdateLastCache) {
+      this.needUpdateLastCache = needUpdateLastCache;
+    }
+
+    public void addCachedLastValue(
+        TimeValuePair timeValuePair, PlanNodeId planNodeId, String fullPath) {
+      if (cachedLastValueList == null) {
+        cachedLastValueList = new ArrayList<>();
+        cachedLastValuePathList = new ArrayList<>();
+        firstCachedPlanNodeId = planNodeId;
+      }
+      cachedLastValueList.add(timeValuePair);
+      cachedLastValuePathList.add(fullPath);
+    }
+
+    public List<TimeValuePair> getCachedLastValueList() {
+      return cachedLastValueList;
+    }
+
     public ISinkHandle getSinkHandle() {
       return sinkHandle;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
index e8223a42d0..5923bc8942 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
@@ -22,6 +22,11 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.Nullable;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -35,14 +40,18 @@ public class LastQueryMergeNode extends ProcessNode {
   // make sure child in list has been ordered by their sensor name
   private List<PlanNode> children;
 
-  public LastQueryMergeNode(PlanNodeId id) {
+  private final Filter timeFilter;
+
+  public LastQueryMergeNode(PlanNodeId id, Filter timeFilter) {
     super(id);
     this.children = new ArrayList<>();
+    this.timeFilter = timeFilter;
   }
 
-  public LastQueryMergeNode(PlanNodeId id, List<PlanNode> children) {
+  public LastQueryMergeNode(PlanNodeId id, List<PlanNode> children, Filter timeFilter) {
     super(id);
     this.children = children;
+    this.timeFilter = timeFilter;
   }
 
   @Override
@@ -57,7 +66,7 @@ public class LastQueryMergeNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new LastQueryMergeNode(getPlanNodeId());
+    return new LastQueryMergeNode(getPlanNodeId(), timeFilter);
   }
 
   @Override
@@ -97,14 +106,29 @@ public class LastQueryMergeNode extends ProcessNode {
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.LAST_QUERY_MERGE.serialize(byteBuffer);
+    if (timeFilter == null) {
+      ReadWriteIOUtils.write((byte) 0, byteBuffer);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, byteBuffer);
+      timeFilter.serialize(byteBuffer);
+    }
   }
 
   public static LastQueryMergeNode deserialize(ByteBuffer byteBuffer) {
+    Filter timeFilter = null;
+    if (!ReadWriteIOUtils.readIsNull(byteBuffer)) {
+      timeFilter = FilterFactory.deserialize(byteBuffer);
+    }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new LastQueryMergeNode(planNodeId);
+    return new LastQueryMergeNode(planNodeId, timeFilter);
   }
 
   public void setChildren(List<PlanNode> children) {
     this.children = children;
   }
+
+  @Nullable
+  public Filter getTimeFilter() {
+    return timeFilter;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
index 546c357e82..ab20f9548c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
@@ -25,14 +25,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import com.google.common.collect.ImmutableList;
 
-import javax.annotation.Nullable;
-
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
@@ -45,25 +40,18 @@ public class LastQueryScanNode extends SourceNode {
   // The path of the target series which will be scanned.
   private final MeasurementPath seriesPath;
 
-  private Filter timeFilter;
-
   // The id of DataRegion where the node will run
   private TRegionReplicaSet regionReplicaSet;
 
-  public LastQueryScanNode(PlanNodeId id, MeasurementPath seriesPath, Filter timeFilter) {
+  public LastQueryScanNode(PlanNodeId id, MeasurementPath seriesPath) {
     super(id);
     this.seriesPath = seriesPath;
-    this.timeFilter = timeFilter;
   }
 
   public LastQueryScanNode(
-      PlanNodeId id,
-      MeasurementPath seriesPath,
-      Filter timeFilter,
-      TRegionReplicaSet regionReplicaSet) {
+      PlanNodeId id, MeasurementPath seriesPath, TRegionReplicaSet regionReplicaSet) {
     super(id);
     this.seriesPath = seriesPath;
-    this.timeFilter = timeFilter;
     this.regionReplicaSet = regionReplicaSet;
   }
 
@@ -84,15 +72,6 @@ public class LastQueryScanNode extends SourceNode {
     return seriesPath;
   }
 
-  @Nullable
-  public Filter getTimeFilter() {
-    return timeFilter;
-  }
-
-  public void setTimeFilter(@Nullable Filter timeFilter) {
-    this.timeFilter = timeFilter;
-  }
-
   @Override
   public void close() throws Exception {}
 
@@ -108,7 +87,7 @@ public class LastQueryScanNode extends SourceNode {
 
   @Override
   public PlanNode clone() {
-    return new LastQueryScanNode(getPlanNodeId(), seriesPath, timeFilter, regionReplicaSet);
+    return new LastQueryScanNode(getPlanNodeId(), seriesPath, regionReplicaSet);
   }
 
   @Override
@@ -133,13 +112,12 @@ public class LastQueryScanNode extends SourceNode {
     if (!super.equals(o)) return false;
     LastQueryScanNode that = (LastQueryScanNode) o;
     return Objects.equals(seriesPath, that.seriesPath)
-        && Objects.equals(timeFilter, that.timeFilter)
         && Objects.equals(regionReplicaSet, that.regionReplicaSet);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), seriesPath, timeFilter, regionReplicaSet);
+    return Objects.hash(super.hashCode(), seriesPath, regionReplicaSet);
   }
 
   @Override
@@ -153,21 +131,11 @@ public class LastQueryScanNode extends SourceNode {
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.LAST_QUERY_SCAN.serialize(byteBuffer);
     seriesPath.serialize(byteBuffer);
-    if (timeFilter == null) {
-      ReadWriteIOUtils.write((byte) 0, byteBuffer);
-    } else {
-      ReadWriteIOUtils.write((byte) 1, byteBuffer);
-      timeFilter.serialize(byteBuffer);
-    }
   }
 
   public static LastQueryScanNode deserialize(ByteBuffer byteBuffer) {
     MeasurementPath partialPath = (MeasurementPath) PathDeserializeUtil.deserialize(byteBuffer);
-    Filter timeFilter = null;
-    if (!ReadWriteIOUtils.readIsNull(byteBuffer)) {
-      timeFilter = FilterFactory.deserialize(byteBuffer);
-    }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new LastQueryScanNode(planNodeId, partialPath, timeFilter);
+    return new LastQueryScanNode(planNodeId, partialPath);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index 62773b5ed9..af9526b142 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -67,6 +67,7 @@ import java.util.stream.Collectors;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_VALUE;
+import static org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.satisfyFilter;
 
 public class LastQueryExecutor {
 
@@ -379,10 +380,6 @@ public class LastQueryExecutor {
     }
   }
 
-  private static boolean satisfyFilter(Filter filter, TimeValuePair tvPair) {
-    return filter == null || filter.satisfy(tvPair.getTimestamp(), tvPair.getValue().getValue());
-  }
-
   public static void clear() {
     ID_TABLE_ENABLED = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
   }