You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/07/22 08:55:28 UTC

[iotdb] 01/02: start

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-3883
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fe98c3362fc7d815632ae2c00941d7a23c12959e
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Jul 21 14:58:16 2022 +0800

    start
---
 .../db/mpp/execution/operator/LastQueryUtil.java   |  37 ++++
 .../operator/process/ProcessOperator.java          |   4 +-
 .../process/{ => last}/LastQueryMergeOperator.java |  38 ++---
 .../operator/process/last/LastQueryOperator.java   | 137 +++++++++++++++
 .../process/last/LastQuerySortOperator.java        | 190 +++++++++++++++++++++
 .../{ => last}/UpdateLastCacheOperator.java        |   3 +-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |   8 +-
 ...peratorTest.java => LastQueryOperatorTest.java} |  34 ++--
 .../operator/UpdateLastCacheOperatorTest.java      |   2 +-
 9 files changed, 407 insertions(+), 46 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
index 50e08472d3..408e5cd56c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.Gt;
@@ -36,6 +37,7 @@ import com.google.common.collect.ImmutableList;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 
 public class LastQueryUtil {
@@ -66,6 +68,41 @@ public class LastQueryUtil {
     builder.declarePosition();
   }
 
+
+  public static void appendLastValue(TsBlockBuilder builder, TsBlock tsBlock) {
+    if (tsBlock.isEmpty()) {
+      return;
+    }
+    int size = tsBlock.getPositionCount();
+    for (int i = 0; i < size; i++) {
+      // Time
+      builder.getTimeColumnBuilder().writeLong(tsBlock.getTimeByIndex(i));
+      // timeseries
+      builder.getColumnBuilder(0).writeBinary(tsBlock.getColumn(0).getBinary(i));
+      // value
+      builder.getColumnBuilder(1).writeBinary(tsBlock.getColumn(1).getBinary(i));
+      // dataType
+      builder.getColumnBuilder(2).writeBinary(tsBlock.getColumn(2).getBinary(i));
+      builder.declarePosition();
+    }
+  }
+
+  public static void appendLastValue(TsBlockBuilder builder, TsBlock tsBlock, int index) {
+    // Time
+    builder.getTimeColumnBuilder().writeLong(tsBlock.getTimeByIndex(index));
+    // timeseries
+    builder.getColumnBuilder(0).writeBinary(tsBlock.getColumn(0).getBinary(index));
+    // value
+    builder.getColumnBuilder(1).writeBinary(tsBlock.getColumn(1).getBinary(index));
+    // dataType
+    builder.getColumnBuilder(2).writeBinary(tsBlock.getColumn(2).getBinary(index));
+    builder.declarePosition();
+  }
+
+  public static int compareTimeSeries(TsBlock a, int indexA, TsBlock b, int indexB, Comparator<Binary> comparator) {
+    return comparator.compare(a.getColumn(0).getBinary(indexA), b.getColumn(0).getBinary(indexB));
+  }
+
   public static boolean satisfyFilter(Filter filter, TimeValuePair tvPair) {
     return filter == null || filter.satisfy(tvPair.getTimestamp(), tvPair.getValue().getValue());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/ProcessOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/ProcessOperator.java
index 6205cb766d..a6f64ff295 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/ProcessOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/ProcessOperator.java
@@ -21,4 +21,6 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 
 // TODO should think about what interfaces should this ProcessOperator have
-public interface ProcessOperator extends Operator {}
+public interface ProcessOperator extends Operator {
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
similarity index 75%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index 91aeecec02..d6998ef1af 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -16,17 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process;
+package org.apache.iotdb.db.mpp.execution.operator.process.last;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
 import java.util.List;
 
+// merge all last query result from different data regions, it will select max time for the same time-series
 public class LastQueryMergeOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
@@ -37,11 +39,14 @@ public class LastQueryMergeOperator implements ProcessOperator {
 
   private int currentIndex;
 
+  private TsBlockBuilder tsBlockBuilder;
+
   public LastQueryMergeOperator(OperatorContext operatorContext, List<Operator> children) {
     this.operatorContext = operatorContext;
     this.children = children;
     this.inputOperatorsCount = children.size();
     this.currentIndex = 0;
+    this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder();
   }
 
   @Override
@@ -51,37 +56,26 @@ public class LastQueryMergeOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<?> isBlocked() {
-    if (currentIndex < inputOperatorsCount) {
-      return children.get(currentIndex).isBlocked();
-    } else {
-      return Futures.immediateVoidFuture();
-    }
+    return ProcessOperator.super.isBlocked();
   }
 
   @Override
   public TsBlock next() {
-    if (children.get(currentIndex).hasNext()) {
-      return children.get(currentIndex).next();
-    } else {
-      currentIndex++;
-      return null;
-    }
+    return null;
   }
 
   @Override
   public boolean hasNext() {
-    return currentIndex < inputOperatorsCount;
+    return false;
   }
 
   @Override
-  public boolean isFinished() {
-    return !hasNext();
+  public void close() throws Exception {
+    ProcessOperator.super.close();
   }
 
   @Override
-  public void close() throws Exception {
-    for (Operator child : children) {
-      child.close();
-    }
+  public boolean isFinished() {
+    return false;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
new file mode 100644
index 0000000000..7152347293
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.last;
+
+import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
+
+// collect all last query result in the same data region and there is no order guarantee
+public class LastQueryOperator implements ProcessOperator {
+
+  private static final int MAX_DETECT_COUNT = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+
+  private final OperatorContext operatorContext;
+
+  private final List<UpdateLastCacheOperator> children;
+
+  private final int inputOperatorsCount;
+
+  private int currentIndex;
+
+  private TsBlockBuilder tsBlockBuilder;
+
+
+  public LastQueryOperator(OperatorContext operatorContext, List<UpdateLastCacheOperator> children, TsBlockBuilder builder) {
+    this.operatorContext = operatorContext;
+    this.children = children;
+    this.inputOperatorsCount = children.size();
+    this.currentIndex = 0;
+    this.tsBlockBuilder = builder;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    if (currentIndex < inputOperatorsCount) {
+      int endIndex = getEndIndex();
+      List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+      for (int i = currentIndex; i < endIndex; i++) {
+        ListenableFuture<?> blocked = children.get(i).isBlocked();
+        if (!blocked.isDone()) {
+          listenableFutures.add(blocked);
+        }
+      }
+      return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures);
+    } else {
+      return Futures.immediateVoidFuture();
+    }
+  }
+
+  @Override
+  public TsBlock next() {
+
+    // we have consumed up data from children Operator, just return all remaining cached data in tsBlockBuilder
+    if (currentIndex >= inputOperatorsCount) {
+      return tsBlockBuilder.build();
+    }
+
+    // start stopwatch
+    long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+    long start = System.nanoTime();
+
+    int endIndex = getEndIndex();
+
+    while ((System.nanoTime() - start < maxRuntime) && (currentIndex < endIndex) && !tsBlockBuilder.isFull()) {
+      if (children.get(currentIndex).hasNext()) {
+        TsBlock tsBlock = children.get(currentIndex).next();
+        if (tsBlock == null) {
+          return null;
+        } else if (!tsBlock.isEmpty()) {
+          LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock, 0);
+        }
+      }
+      currentIndex++;
+    }
+
+    TsBlock res = tsBlockBuilder.build();
+    tsBlockBuilder.reset();
+    return res;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return currentIndex < inputOperatorsCount || !tsBlockBuilder.isEmpty();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return !hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Operator child : children) {
+      child.close();
+    }
+    tsBlockBuilder = null;
+  }
+
+  private int getEndIndex() {
+    return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
+  }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
new file mode 100644
index 0000000000..458e2c70ef
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.last;
+
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+import static org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.compareTimeSeries;
+
+// collect all last query result in the same data region and sort them according to the time-series's alphabetical order
+public class LastQuerySortOperator implements ProcessOperator {
+
+  private static final int MAX_DETECT_COUNT = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+
+  // we must make sure that data in cachedTsBlock has already been sorted
+  // values that have last cache
+  private TsBlock cachedTsBlock;
+
+  private final int cachedTsBlockSize;
+
+  // read index for cachedTsBlock
+  private int cachedTsBlockRowIndex;
+
+  // we must make sure that Operator in children has already been sorted
+  private final List<UpdateLastCacheOperator> children;
+
+  private final OperatorContext operatorContext;
+
+  private final int inputOperatorsCount;
+
+  private int currentIndex;
+
+  private final TsBlockBuilder tsBlockBuilder;
+
+  private final Comparator<Binary> timeSeriesComparator;
+
+  // used to cache previous TsBlock get from children
+  private TsBlock previousTsBlock;
+
+  public LastQuerySortOperator(OperatorContext operatorContext, TsBlock cachedTsBlock, List<UpdateLastCacheOperator> children, Comparator<Binary> timeSeriesComparator) {
+    this.cachedTsBlock = cachedTsBlock;
+    this.cachedTsBlockSize = cachedTsBlock.getPositionCount();
+    this.operatorContext = operatorContext;
+    this.children = children;
+    this.inputOperatorsCount = children.size();
+    this.currentIndex = 0;
+    this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder();
+    this.timeSeriesComparator = timeSeriesComparator;
+    this.previousTsBlock = null;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    if (currentIndex < inputOperatorsCount) {
+      int endIndex = getEndIndex();
+      List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+      for (int i = currentIndex; i < endIndex; i++) {
+        ListenableFuture<?> blocked = children.get(i).isBlocked();
+        if (!blocked.isDone()) {
+          listenableFutures.add(blocked);
+        }
+      }
+      return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures);
+    } else {
+      return Futures.immediateVoidFuture();
+    }
+  }
+
+  @Override
+  public TsBlock next() {
+    // we have consumed up data from children Operator, just return all remaining cached data in cachedTsBlock, tsBlockBuilder and previousTsBlock
+    if (currentIndex >= inputOperatorsCount) {
+      while (previousTsBlock != null) {
+        if (canUseDataFromCachedTsBlock(previousTsBlock)) {
+          LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
+        } else {
+          LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock, 0);
+          previousTsBlock = null;
+        }
+      }
+      TsBlock res = cachedTsBlock.subTsBlock(cachedTsBlockRowIndex);
+      cachedTsBlockRowIndex = cachedTsBlockSize;
+      if (!tsBlockBuilder.isEmpty()) {
+        LastQueryUtil.appendLastValue(tsBlockBuilder, res);
+        res = tsBlockBuilder.build();
+        tsBlockBuilder.reset();
+      }
+      return res;
+    }
+
+
+    // start stopwatch
+    long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+    long start = System.nanoTime();
+
+    int endIndex = getEndIndex();
+
+    while ((System.nanoTime() - start < maxRuntime) && (currentIndex < endIndex || previousTsBlock != null) && !tsBlockBuilder.isFull()) {
+      if (previousTsBlock != null) {
+        if (canUseDataFromCachedTsBlock(previousTsBlock)) {
+          LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
+        } else {
+          LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock, 0);
+          previousTsBlock = null;
+        }
+      } else {
+        if (children.get(currentIndex).hasNext()) {
+          TsBlock tsBlock = children.get(currentIndex).next();
+          if (tsBlock == null) {
+            return null;
+          } else if (!tsBlock.isEmpty()) {
+            if (canUseDataFromCachedTsBlock(tsBlock)) {
+              LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
+              previousTsBlock = tsBlock;
+            } else {
+              LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock, 0);
+            }
+          }
+        }
+        currentIndex++;
+      }
+    }
+
+    TsBlock res = tsBlockBuilder.build();
+    tsBlockBuilder.reset();
+    return res;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return currentIndex < inputOperatorsCount || cachedTsBlockRowIndex < cachedTsBlockSize || !tsBlockBuilder.isEmpty() || previousTsBlock != null;
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Operator child : children) {
+      child.close();
+    }
+    cachedTsBlock = null;
+  }
+
+  @Override
+  public boolean isFinished() {
+    return !hasNext();
+  }
+
+  private int getEndIndex() {
+    return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
+  }
+
+  private boolean canUseDataFromCachedTsBlock(TsBlock tsBlock) {
+    return cachedTsBlockRowIndex < cachedTsBlockSize && compareTimeSeries(cachedTsBlock, cachedTsBlockRowIndex, tsBlock, 0, timeSeriesComparator) < 0;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 4088a13396..036295ace0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -16,13 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process;
+package org.apache.iotdb.db.mpp.execution.operator.process.last;
 
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 704d49c5ea..b8aef10211 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -47,7 +47,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FilterOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.LastQueryMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
@@ -56,7 +56,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOper
 import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
@@ -1371,9 +1371,9 @@ public class LocalExecutionPlanner {
           context.instanceContext.addOperatorContext(
               context.getNextOperatorId(),
               node.getPlanNodeId(),
-              LastQueryMergeOperator.class.getSimpleName());
+              LastQueryOperator.class.getSimpleName());
       context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-      return new LastQueryMergeOperator(operatorContext, operatorList);
+      return new LastQueryOperator(operatorContext, operatorList);
     }
 
     private Map<String, List<InputLocation>> makeLayout(PlanNode node) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
similarity index 93%
rename from server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
index c0ced72d91..5804095ffd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
@@ -30,8 +30,8 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
-import org.apache.iotdb.db.mpp.execution.operator.process.LastQueryMergeOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -60,7 +60,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class LastQueryMergeOperatorTest {
+public class LastQueryOperatorTest {
 
   private static final String SERIES_SCAN_OPERATOR_TEST_SG = "root.LastQueryMergeOperatorTest";
   private final List<String> deviceIds = new ArrayList<>();
@@ -117,7 +117,7 @@ public class LastQueryMergeOperatorTest {
 
       PlanNodeId planNodeId5 = new PlanNodeId("5");
       fragmentInstanceContext.addOperatorContext(
-          5, planNodeId5, LastQueryMergeOperator.class.getSimpleName());
+          5, planNodeId5, LastQueryOperator.class.getSimpleName());
 
       fragmentInstanceContext
           .getOperatorContexts()
@@ -170,16 +170,16 @@ public class LastQueryMergeOperatorTest {
               null,
               false);
 
-      LastQueryMergeOperator lastQueryMergeOperator =
-          new LastQueryMergeOperator(
+      LastQueryOperator lastQueryOperator =
+          new LastQueryOperator(
               fragmentInstanceContext.getOperatorContexts().get(4),
               ImmutableList.of(updateLastCacheOperator1, updateLastCacheOperator2));
 
       int count = 0;
-      while (!lastQueryMergeOperator.isFinished()) {
-        assertTrue(lastQueryMergeOperator.isBlocked().isDone());
-        assertTrue(lastQueryMergeOperator.hasNext());
-        TsBlock result = lastQueryMergeOperator.next();
+      while (!lastQueryOperator.isFinished()) {
+        assertTrue(lastQueryOperator.isBlocked().isDone());
+        assertTrue(lastQueryOperator.hasNext());
+        TsBlock result = lastQueryOperator.next();
         if (result == null) {
           continue;
         }
@@ -239,7 +239,7 @@ public class LastQueryMergeOperatorTest {
 
       PlanNodeId planNodeId6 = new PlanNodeId("6");
       fragmentInstanceContext.addOperatorContext(
-          6, planNodeId6, LastQueryMergeOperator.class.getSimpleName());
+          6, planNodeId6, LastQueryOperator.class.getSimpleName());
 
       fragmentInstanceContext
           .getOperatorContexts()
@@ -307,17 +307,17 @@ public class LastQueryMergeOperatorTest {
           new LastCacheScanOperator(
               fragmentInstanceContext.getOperatorContexts().get(4), planNodeId5, tsBlock);
 
-      LastQueryMergeOperator lastQueryMergeOperator =
-          new LastQueryMergeOperator(
+      LastQueryOperator lastQueryOperator =
+          new LastQueryOperator(
               fragmentInstanceContext.getOperatorContexts().get(5),
               ImmutableList.of(
                   updateLastCacheOperator1, updateLastCacheOperator2, lastCacheScanOperator));
 
       int count = 0;
-      while (!lastQueryMergeOperator.isFinished()) {
-        assertTrue(lastQueryMergeOperator.isBlocked().isDone());
-        assertTrue(lastQueryMergeOperator.hasNext());
-        TsBlock result = lastQueryMergeOperator.next();
+      while (!lastQueryOperator.isFinished()) {
+        assertTrue(lastQueryOperator.isBlocked().isDone());
+        assertTrue(lastQueryOperator.hasNext());
+        TsBlock result = lastQueryOperator.next();
         if (result == null) {
           continue;
         }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
index 581f1e703e..4567680edb 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
-import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;