You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/04 10:16:16 UTC

[iotdb] branch rel/1.0 updated: [To rel/1.0] [IOTDB-5026] Improve last query on aligned timeseries

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new 96343c9c9e [To rel/1.0] [IOTDB-5026] Improve last query on aligned timeseries
96343c9c9e is described below

commit 96343c9c9ead2c6cf48ad0c3d05d90d189aef9c9
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Sun Dec 4 18:16:10 2022 +0800

    [To rel/1.0] [IOTDB-5026] Improve last query on aligned timeseries
---
 .../org/apache/iotdb/commons/path/AlignedPath.java | 18 +++++
 ...r.java => AbstractUpdateLastCacheOperator.java} | 72 +++--------------
 .../last/AlignedUpdateLastCacheOperator.java       | 89 ++++++++++++++++++++++
 .../operator/process/last/LastQueryOperator.java   |  6 +-
 .../process/last/LastQuerySortOperator.java        | 11 +--
 .../operator/process/last/LastQueryUtil.java       | 18 +++++
 .../process/last/UpdateLastCacheOperator.java      | 84 +-------------------
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  1 -
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    | 20 +++--
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 89 +++++++++++++---------
 .../db/mpp/plan/planner/SubPlanTypeExtractor.java  | 29 +++++++
 .../mpp/execution/operator/OperatorMemoryTest.java |  5 +-
 12 files changed, 244 insertions(+), 198 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java b/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
index de2a16c4ad..affcf4b70c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
@@ -92,6 +92,12 @@ public class AlignedPath extends PartialPath {
     measurementList.add(subSensor);
   }
 
+  public AlignedPath(PartialPath vectorPath) {
+    super(vectorPath.getNodes());
+    measurementList = new ArrayList<>();
+    schemaList = new ArrayList<>();
+  }
+
   public AlignedPath(MeasurementPath path) {
     super(path.getDevicePath().getNodes());
     measurementList = new ArrayList<>();
@@ -157,6 +163,18 @@ public class AlignedPath extends PartialPath {
     schemaList.add(measurementPath.getMeasurementSchema());
   }
 
+  public void addMeasurement(String measurement, IMeasurementSchema measurementSchema) {
+    if (measurementList == null) {
+      measurementList = new ArrayList<>();
+    }
+    measurementList.add(measurement);
+
+    if (schemaList == null) {
+      schemaList = new ArrayList<>();
+    }
+    schemaList.add(measurementSchema);
+  }
+
   /**
    * merge another aligned path's sub sensors into this one
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
index 8afd9b99de..0b4fad3bda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
@@ -16,62 +16,45 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process.last;
 
-import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-public class UpdateLastCacheOperator implements ProcessOperator {
-
-  private static final TsBlock LAST_QUERY_EMPTY_TSBLOCK =
+public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator {
+  protected static final TsBlock LAST_QUERY_EMPTY_TSBLOCK =
       new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT))
           .build();
 
-  private final OperatorContext operatorContext;
-
-  private final Operator child;
-
-  // fullPath for queried time series
-  // It should be exact PartialPath, neither MeasurementPath nor AlignedPath, because lastCache only
-  // accept PartialPath
-  private final MeasurementPath fullPath;
+  protected OperatorContext operatorContext;
 
-  // dataType for queried time series;
-  private final String dataType;
+  protected Operator child;
 
-  private final DataNodeSchemaCache lastCache;
+  protected DataNodeSchemaCache lastCache;
 
-  private final boolean needUpdateCache;
+  protected boolean needUpdateCache;
 
-  private final TsBlockBuilder tsBlockBuilder;
+  protected TsBlockBuilder tsBlockBuilder;
 
-  private String databaseName;
+  protected String databaseName;
 
-  public UpdateLastCacheOperator(
+  public AbstractUpdateLastCacheOperator(
       OperatorContext operatorContext,
       Operator child,
-      MeasurementPath fullPath,
-      TSDataType dataType,
       DataNodeSchemaCache dataNodeSchemaCache,
       boolean needUpdateCache) {
     this.operatorContext = operatorContext;
     this.child = child;
-    this.fullPath = fullPath;
-    this.dataType = dataType.name();
     this.lastCache = dataNodeSchemaCache;
     this.needUpdateCache = needUpdateCache;
     this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(1);
@@ -87,40 +70,7 @@ public class UpdateLastCacheOperator implements ProcessOperator {
     return child.isBlocked();
   }
 
-  @Override
-  public TsBlock next() {
-    TsBlock res = child.next();
-    if (res == null) {
-      return null;
-    }
-    if (res.isEmpty()) {
-      return LAST_QUERY_EMPTY_TSBLOCK;
-    }
-
-    checkArgument(res.getPositionCount() == 1, "last query result should only have one record");
-
-    // last value is null
-    if (res.getColumn(0).isNull(0)) {
-      return LAST_QUERY_EMPTY_TSBLOCK;
-    }
-
-    long lastTime = res.getColumn(0).getLong(0);
-    TsPrimitiveType lastValue = res.getColumn(1).getTsPrimitiveType(0);
-
-    if (needUpdateCache) {
-      TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue);
-      lastCache.updateLastCache(getDatabaseName(), fullPath, timeValuePair, false, Long.MIN_VALUE);
-    }
-
-    tsBlockBuilder.reset();
-
-    LastQueryUtil.appendLastValue(
-        tsBlockBuilder, lastTime, fullPath.getFullPath(), lastValue.getStringValue(), dataType);
-
-    return tsBlockBuilder.build();
-  }
-
-  private String getDatabaseName() {
+  protected String getDatabaseName() {
     if (databaseName == null) {
       databaseName =
           ((DataDriverContext) operatorContext.getInstanceContext().getDriverContext())
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
new file mode 100644
index 0000000000..14354cbc3b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.last;
+
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import static org.weakref.jmx.internal.guava.base.Preconditions.checkArgument;
+
+/** update last cache for aligned series */
+public class AlignedUpdateLastCacheOperator extends AbstractUpdateLastCacheOperator {
+
+  private final AlignedPath seriesPath;
+
+  private final PartialPath devicePath;
+
+  public AlignedUpdateLastCacheOperator(
+      OperatorContext operatorContext,
+      Operator child,
+      AlignedPath seriesPath,
+      DataNodeSchemaCache dataNodeSchemaCache,
+      boolean needUpdateCache) {
+    super(operatorContext, child, dataNodeSchemaCache, needUpdateCache);
+    this.seriesPath = seriesPath;
+    this.devicePath = seriesPath.getDevicePath();
+  }
+
+  @Override
+  public TsBlock next() {
+    TsBlock res = child.next();
+    if (res == null) {
+      return null;
+    }
+    if (res.isEmpty()) {
+      return LAST_QUERY_EMPTY_TSBLOCK;
+    }
+
+    checkArgument(res.getPositionCount() == 1, "last query result should only have one record");
+
+    tsBlockBuilder.reset();
+    for (int i = 0; i + 1 < res.getValueColumnCount(); i += 2) {
+      if (!res.getColumn(i).isNull(0)) {
+        long lastTime = res.getColumn(i).getLong(0);
+        TsPrimitiveType lastValue = res.getColumn(i + 1).getTsPrimitiveType(0);
+        MeasurementPath measurementPath =
+            new MeasurementPath(
+                devicePath.concatNode(seriesPath.getMeasurementList().get(i / 2)),
+                seriesPath.getSchemaList().get(i / 2),
+                true);
+        if (needUpdateCache) {
+          TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue);
+          lastCache.updateLastCache(
+              getDatabaseName(), measurementPath, timeValuePair, false, Long.MIN_VALUE);
+        }
+        LastQueryUtil.appendLastValue(
+            tsBlockBuilder,
+            lastTime,
+            measurementPath.getFullPath(),
+            lastValue.getStringValue(),
+            seriesPath.getSchemaList().get(i / 2).getType().name());
+      }
+    }
+    return !tsBlockBuilder.isEmpty() ? tsBlockBuilder.build() : LAST_QUERY_EMPTY_TSBLOCK;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index 27ae0becda..7a689d4aa0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -43,7 +43,7 @@ public class LastQueryOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
 
-  private final List<UpdateLastCacheOperator> children;
+  private final List<AbstractUpdateLastCacheOperator> children;
 
   private final int inputOperatorsCount;
 
@@ -53,7 +53,7 @@ public class LastQueryOperator implements ProcessOperator {
 
   public LastQueryOperator(
       OperatorContext operatorContext,
-      List<UpdateLastCacheOperator> children,
+      List<AbstractUpdateLastCacheOperator> children,
       TsBlockBuilder builder) {
     this.operatorContext = operatorContext;
     this.children = children;
@@ -109,7 +109,7 @@ public class LastQueryOperator implements ProcessOperator {
         if (tsBlock == null) {
           return null;
         } else if (!tsBlock.isEmpty()) {
-          LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock, 0);
+          LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock);
         }
       }
       currentIndex++;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index 082173c29f..aeb38b6b7d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -55,7 +55,7 @@ public class LastQuerySortOperator implements ProcessOperator {
   private int cachedTsBlockRowIndex;
 
   // we must make sure that Operator in children has already been sorted
-  private final List<UpdateLastCacheOperator> children;
+  private final List<AbstractUpdateLastCacheOperator> children;
 
   private final OperatorContext operatorContext;
 
@@ -73,7 +73,7 @@ public class LastQuerySortOperator implements ProcessOperator {
   public LastQuerySortOperator(
       OperatorContext operatorContext,
       TsBlock cachedTsBlock,
-      List<UpdateLastCacheOperator> children,
+      List<AbstractUpdateLastCacheOperator> children,
       Comparator<Binary> timeSeriesComparator) {
     this.cachedTsBlock = cachedTsBlock;
     this.cachedTsBlockSize = cachedTsBlock.getPositionCount();
@@ -117,7 +117,7 @@ public class LastQuerySortOperator implements ProcessOperator {
         if (canUseDataFromCachedTsBlock(previousTsBlock)) {
           LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
         } else {
-          LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock, 0);
+          LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock);
           previousTsBlock = null;
         }
       }
@@ -144,7 +144,7 @@ public class LastQuerySortOperator implements ProcessOperator {
         if (canUseDataFromCachedTsBlock(previousTsBlock)) {
           LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
         } else {
-          LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock, 0);
+          LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock);
           previousTsBlock = null;
         }
       } else {
@@ -157,7 +157,8 @@ public class LastQuerySortOperator implements ProcessOperator {
               LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
               previousTsBlock = tsBlock;
             } else {
-              LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock, 0);
+              // it is safe to append the whole TsBlock
+              LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock);
             }
           }
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryUtil.java
index 3ec62e8877..f8bb75f088 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryUtil.java
@@ -140,6 +140,24 @@ public class LastQueryUtil {
     return aggregators;
   }
 
+  public static List<Aggregator> createAggregators(TSDataType dataType, int valueColumnIndex) {
+    // max_time, last_value
+    List<Aggregator> aggregators = new ArrayList<>(2);
+    aggregators.add(
+        new Aggregator(
+            new MaxTimeDescAccumulator(),
+            AggregationStep.SINGLE,
+            Collections.singletonList(
+                new InputLocation[] {new InputLocation(0, valueColumnIndex)})));
+    aggregators.add(
+        new Aggregator(
+            new LastValueDescAccumulator(dataType),
+            AggregationStep.SINGLE,
+            Collections.singletonList(
+                new InputLocation[] {new InputLocation(0, valueColumnIndex)})));
+    return aggregators;
+  }
+
   public static boolean needUpdateCache(Filter timeFilter) {
     // Update the cache only when, the filter is gt (greater than) or ge (greater than or equal to)
     return CACHE_ENABLED && (timeFilter == null || timeFilter instanceof GtEq)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 8afd9b99de..7315c81f46 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -20,46 +20,24 @@ package org.apache.iotdb.db.mpp.execution.operator.process.last;
 
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
-import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.ListenableFuture;
-
 import static com.google.common.base.Preconditions.checkArgument;
 
-public class UpdateLastCacheOperator implements ProcessOperator {
-
-  private static final TsBlock LAST_QUERY_EMPTY_TSBLOCK =
-      new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT))
-          .build();
-
-  private final OperatorContext operatorContext;
-
-  private final Operator child;
+public class UpdateLastCacheOperator extends AbstractUpdateLastCacheOperator {
 
   // fullPath for queried time series
   // It should be exact PartialPath, neither MeasurementPath nor AlignedPath, because lastCache only
   // accept PartialPath
-  private final MeasurementPath fullPath;
+  private MeasurementPath fullPath;
 
   // dataType for queried time series;
-  private final String dataType;
-
-  private final DataNodeSchemaCache lastCache;
-
-  private final boolean needUpdateCache;
-
-  private final TsBlockBuilder tsBlockBuilder;
-
-  private String databaseName;
+  private String dataType;
 
   public UpdateLastCacheOperator(
       OperatorContext operatorContext,
@@ -68,23 +46,9 @@ public class UpdateLastCacheOperator implements ProcessOperator {
       TSDataType dataType,
       DataNodeSchemaCache dataNodeSchemaCache,
       boolean needUpdateCache) {
-    this.operatorContext = operatorContext;
-    this.child = child;
+    super(operatorContext, child, dataNodeSchemaCache, needUpdateCache);
     this.fullPath = fullPath;
     this.dataType = dataType.name();
-    this.lastCache = dataNodeSchemaCache;
-    this.needUpdateCache = needUpdateCache;
-    this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(1);
-  }
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public ListenableFuture<?> isBlocked() {
-    return child.isBlocked();
   }
 
   @Override
@@ -119,44 +83,4 @@ public class UpdateLastCacheOperator implements ProcessOperator {
 
     return tsBlockBuilder.build();
   }
-
-  private String getDatabaseName() {
-    if (databaseName == null) {
-      databaseName =
-          ((DataDriverContext) operatorContext.getInstanceContext().getDriverContext())
-              .getDataRegion()
-              .getStorageGroupName();
-    }
-    return databaseName;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return child.hasNext();
-  }
-
-  @Override
-  public boolean isFinished() {
-    return child.isFinished();
-  }
-
-  @Override
-  public void close() throws Exception {
-    child.close();
-  }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return child.calculateMaxPeekMemory();
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return child.calculateMaxReturnSize();
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return child.calculateRetainedSizeAfterCallingNext();
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 5d27adb255..aff917599b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -353,7 +353,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
               .collect(Collectors.toCollection(LinkedHashSet::new));
     }
 
-    sourceExpressions.forEach(expression -> analyzeExpression(analysis, expression));
     analysis.setSourceExpressions(sourceExpressions);
 
     analysis.setRespDatasetHeader(DatasetHeaderFactory.getLastQueryHeader());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 4033ddf52c..a04f877064 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -192,18 +192,22 @@ public class LogicalPlanBuilder {
       Filter globalTimeFilter,
       OrderByParameter mergeOrderParameter) {
     List<PlanNode> sourceNodeList = new ArrayList<>();
-    for (Expression sourceExpression : sourceExpressions) {
-      MeasurementPath selectPath =
-          (MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath();
-      if (selectPath.isUnderAlignedEntity()) {
+    List<PartialPath> selectedPaths =
+        sourceExpressions.stream()
+            .map(expression -> ((TimeSeriesOperand) expression).getPath())
+            .collect(Collectors.toList());
+    List<PartialPath> groupedPaths = MetaUtils.groupAlignedSeries(selectedPaths);
+    for (PartialPath path : groupedPaths) {
+      if (path instanceof MeasurementPath) { // non-aligned series
+        sourceNodeList.add(
+            new LastQueryScanNode(context.getQueryId().genPlanNodeId(), (MeasurementPath) path));
+      } else if (path instanceof AlignedPath) { // aligned series
         sourceNodeList.add(
-            new AlignedLastQueryScanNode(
-                context.getQueryId().genPlanNodeId(), new AlignedPath(selectPath)));
+            new AlignedLastQueryScanNode(context.getQueryId().genPlanNodeId(), (AlignedPath) path));
       } else {
-        sourceNodeList.add(new LastQueryScanNode(context.getQueryId().genPlanNodeId(), selectPath));
+        throw new IllegalArgumentException("unexpected path type");
       }
     }
-    updateTypeProvider(sourceExpressions);
 
     this.root =
         new LastQueryNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 484a342715..aea6df9299 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -82,6 +82,8 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MultiColumn
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.AbstractUpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.AlignedUpdateLastCacheOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryCollectOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
@@ -1704,34 +1706,46 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
   @Override
   public Operator visitAlignedLastQueryScan(
       AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
-    PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
-    TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
-    if (timeValuePair == null) { // last value is not cached
-      return createUpdateLastCacheOperator(
-          node, context, node.getSeriesPath().getMeasurementPath());
-    } else if (!LastQueryUtil.satisfyFilter(
-        updateFilterUsingTTL(context.getLastQueryTimeFilter(), context.getDataRegionTTL()),
-        timeValuePair)) { // cached last value is not satisfied
-
-      boolean isFilterGtOrGe =
-          (context.getLastQueryTimeFilter() instanceof Gt
-              || context.getLastQueryTimeFilter() instanceof GtEq);
-      // time filter is not > or >=, we still need to read from disk
-      if (!isFilterGtOrGe) {
-        return createUpdateLastCacheOperator(
-            node, context, node.getSeriesPath().getMeasurementPath());
-      } else { // otherwise, we just ignore it and return null
-        return null;
+    AlignedPath alignedPath = node.getSeriesPath();
+    PartialPath devicePath = alignedPath.getDevicePath();
+    // get series under aligned entity that has not been cached
+    List<Integer> unCachedMeasurementIndexes = new ArrayList<>();
+    List<String> measurementList = alignedPath.getMeasurementList();
+    for (int i = 0; i < measurementList.size(); i++) {
+      PartialPath measurementPath = devicePath.concatNode(measurementList.get(i));
+      TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(measurementPath);
+      if (timeValuePair == null) { // last value is not cached
+        unCachedMeasurementIndexes.add(i);
+      } else if (!LastQueryUtil.satisfyFilter(
+          updateFilterUsingTTL(context.getLastQueryTimeFilter(), context.getDataRegionTTL()),
+          timeValuePair)) { // cached last value is not satisfied
+
+        boolean isFilterGtOrGe =
+            (context.getLastQueryTimeFilter() instanceof Gt
+                || context.getLastQueryTimeFilter() instanceof GtEq);
+        // time filter is not > or >=, we still need to read from disk
+        if (!isFilterGtOrGe) {
+          unCachedMeasurementIndexes.add(i);
+        }
+      } else { //  cached last value is satisfied, put it into LastCacheScanOperator
+        context.addCachedLastValue(timeValuePair, measurementPath.getFullPath());
       }
-    } else { //  cached last value is satisfied, put it into LastCacheScanOperator
-      context.addCachedLastValue(timeValuePair, seriesPath.getFullPath());
+    }
+    if (unCachedMeasurementIndexes.isEmpty()) {
       return null;
+    } else {
+      AlignedPath unCachedPath = new AlignedPath(alignedPath.getDevicePath());
+      for (int i : unCachedMeasurementIndexes) {
+        unCachedPath.addMeasurement(measurementList.get(i), alignedPath.getSchemaList().get(i));
+      }
+      return createAlignedUpdateLastCacheOperator(node, unCachedPath, context);
     }
   }
 
-  private UpdateLastCacheOperator createUpdateLastCacheOperator(
-      AlignedLastQueryScanNode node, LocalExecutionPlanContext context, MeasurementPath fullPath) {
-    AlignedSeriesAggregationScanOperator lastQueryScan = createLastQueryScanOperator(node, context);
+  private AlignedUpdateLastCacheOperator createAlignedUpdateLastCacheOperator(
+      AlignedLastQueryScanNode node, AlignedPath unCachedPath, LocalExecutionPlanContext context) {
+    AlignedSeriesAggregationScanOperator lastQueryScan =
+        createLastQueryScanOperator(node, unCachedPath, context);
 
     OperatorContext operatorContext =
         context
@@ -1739,20 +1753,18 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
             .addOperatorContext(
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
-                UpdateLastCacheOperator.class.getSimpleName());
+                AlignedUpdateLastCacheOperator.class.getSimpleName());
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-    return new UpdateLastCacheOperator(
+    return new AlignedUpdateLastCacheOperator(
         operatorContext,
         lastQueryScan,
-        fullPath,
-        node.getSeriesPath().getSchemaList().get(0).getType(),
+        unCachedPath,
         DATA_NODE_SCHEMA_CACHE,
         context.isNeedUpdateLastCache());
   }
 
   private AlignedSeriesAggregationScanOperator createLastQueryScanOperator(
-      AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
-    AlignedPath seriesPath = node.getSeriesPath();
+      AlignedLastQueryScanNode node, AlignedPath unCachedPath, LocalExecutionPlanContext context) {
     OperatorContext operatorContext =
         context
             .getInstanceContext()
@@ -1762,17 +1774,18 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                 AlignedSeriesAggregationScanOperator.class.getSimpleName());
 
     // last_time, last_value
-    List<Aggregator> aggregators =
-        LastQueryUtil.createAggregators(seriesPath.getSchemaList().get(0).getType());
+    List<Aggregator> aggregators = new ArrayList<>();
+    for (int i = 0; i < unCachedPath.getMeasurementList().size(); i++) {
+      aggregators.addAll(
+          LastQueryUtil.createAggregators(unCachedPath.getSchemaList().get(i).getType(), i));
+    }
     ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(null, false, false);
-    long maxReturnSize =
-        calculateMaxAggregationResultSizeForLastQuery(
-            aggregators, seriesPath.transformToPartialPath());
+    long maxReturnSize = calculateMaxAggregationResultSizeForLastQuery(aggregators, unCachedPath);
 
     AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
         new AlignedSeriesAggregationScanOperator(
             node.getPlanNodeId(),
-            seriesPath,
+            unCachedPath,
             operatorContext,
             aggregators,
             timeRangeIterator,
@@ -1781,7 +1794,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
             null,
             maxReturnSize);
     context.addSourceOperator(seriesAggregationScanOperator);
-    context.addPath(seriesPath);
+    context.addPath(unCachedPath);
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
     return seriesAggregationScanOperator;
   }
@@ -1798,11 +1811,11 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     context.setLastQueryTimeFilter(node.getTimeFilter());
     context.setNeedUpdateLastCache(LastQueryUtil.needUpdateCache(node.getTimeFilter()));
 
-    List<UpdateLastCacheOperator> operatorList =
+    List<AbstractUpdateLastCacheOperator> operatorList =
         node.getChildren().stream()
             .map(child -> child.accept(this, context))
             .filter(Objects::nonNull)
-            .map(o -> (UpdateLastCacheOperator) o)
+            .map(o -> (AbstractUpdateLastCacheOperator) o)
             .collect(Collectors.toList());
 
     List<Pair<TimeValuePair, Binary>> cachedLastValueAndPathList =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
index a95adc10c8..38d46ad5e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
@@ -27,7 +27,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 
@@ -105,6 +110,30 @@ public class SubPlanTypeExtractor {
       return visitPlan(node, context);
     }
 
+    // region PlanNode of last query
+    // No need to deal with type of last query
+    public Void visitLastQueryScan(LastQueryScanNode node, Void context) {
+      return null;
+    }
+
+    public Void visitAlignedLastQueryScan(AlignedLastQueryScanNode node, Void context) {
+      return null;
+    }
+
+    public Void visitLastQuery(LastQueryNode node, Void context) {
+      return null;
+    }
+
+    public Void visitLastQueryMerge(LastQueryMergeNode node, Void context) {
+      return null;
+    }
+
+    public Void visitLastQueryCollect(LastQueryCollectNode node, Void context) {
+      return null;
+    }
+
+    // end region PlanNode of last query
+
     private void updateTypeProviderByAggregationDescriptor(
         List<? extends AggregationDescriptor> aggregationDescriptorList) {
       aggregationDescriptorList.stream()
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index 7633beedd9..585ed15f43 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill
 import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.AbstractUpdateLastCacheOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryCollectOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
@@ -325,7 +326,7 @@ public class OperatorMemoryTest {
   public void lastQueryOperatorTest() {
     TsBlockBuilder builder = Mockito.mock(TsBlockBuilder.class);
     Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(1024L);
-    List<UpdateLastCacheOperator> children = new ArrayList<>(4);
+    List<AbstractUpdateLastCacheOperator> children = new ArrayList<>(4);
     long expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
     for (int i = 0; i < 4; i++) {
       UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);
@@ -355,7 +356,7 @@ public class OperatorMemoryTest {
     TsBlock tsBlock = Mockito.mock(TsBlock.class);
     Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(16 * 1024L);
     Mockito.when(tsBlock.getPositionCount()).thenReturn(16);
-    List<UpdateLastCacheOperator> children = new ArrayList<>(4);
+    List<AbstractUpdateLastCacheOperator> children = new ArrayList<>(4);
 
     for (int i = 0; i < 4; i++) {
       UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);