You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/18 15:58:45 UTC

[iotdb] 02/05: add metrics: operator_execution

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/addQueryMetrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 868a7586c18124034f409d00069ecedf67ee57a4
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Dec 15 16:46:44 2022 +0800

    add metrics: operator_execution
---
 .../apache/iotdb/metrics/config/MetricConfig.java  |  5 ++--
 .../iotdb/commons/service/metric/enums/Metric.java |  4 ++-
 .../iotdb/db/mpp/execution/driver/Driver.java      | 19 ++++++++++--
 .../iotdb/db/mpp/execution/operator/Operator.java  | 23 ++++++++++++++
 .../db/mpp/execution/operator/OperatorContext.java | 19 ++++++++++++
 .../operator/process/AbstractIntoOperator.java     |  4 +--
 .../operator/process/AggregationOperator.java      |  4 +--
 .../operator/process/DeviceMergeOperator.java      |  6 ++--
 .../operator/process/DeviceViewOperator.java       |  6 ++--
 .../execution/operator/process/FillOperator.java   |  4 +--
 .../operator/process/FilterAndProjectOperator.java |  4 +--
 .../execution/operator/process/LimitOperator.java  |  4 +--
 .../operator/process/LinearFillOperator.java       |  6 ++--
 .../operator/process/MergeSortOperator.java        |  6 ++--
 .../execution/operator/process/OffsetOperator.java |  4 +--
 .../process/RawDataAggregationOperator.java        |  8 ++---
 .../operator/process/SingleDeviceViewOperator.java |  6 ++--
 .../process/SingleInputAggregationOperator.java    |  2 +-
 .../process/SlidingWindowAggregationOperator.java  |  6 ++--
 .../operator/process/TagAggregationOperator.java   |  6 ++--
 .../operator/process/TransformOperator.java        |  2 +-
 .../process/join/RowBasedTimeJoinOperator.java     |  6 ++--
 .../operator/process/join/TimeJoinOperator.java    |  6 ++--
 .../process/join/VerticallyConcatOperator.java     |  6 ++--
 .../last/AbstractUpdateLastCacheOperator.java      |  2 +-
 .../last/AlignedUpdateLastCacheOperator.java       |  2 +-
 .../process/last/LastQueryCollectOperator.java     |  6 ++--
 .../process/last/LastQueryMergeOperator.java       |  6 ++--
 .../operator/process/last/LastQueryOperator.java   |  6 ++--
 .../process/last/LastQuerySortOperator.java        |  6 ++--
 .../process/last/UpdateLastCacheOperator.java      |  2 +-
 .../operator/schema/CountMergeOperator.java        |  6 ++--
 .../schema/LevelTimeSeriesCountOperator.java       |  2 +-
 .../schema/NodeManageMemoryMergeOperator.java      |  4 +--
 .../operator/schema/NodePathsConvertOperator.java  |  4 +--
 .../operator/schema/NodePathsCountOperator.java    |  4 +--
 .../operator/schema/SchemaFetchMergeOperator.java  |  6 ++--
 .../operator/schema/SchemaQueryMergeOperator.java  |  6 ++--
 .../schema/SchemaQueryOrderByHeatOperator.java     |  6 ++--
 .../operator/schema/SchemaQueryScanOperator.java   |  2 +-
 .../AbstractSeriesAggregationScanOperator.java     |  2 +-
 .../operator/source/LastCacheScanOperator.java     |  2 +-
 .../iotdb/db/mpp/metric/QueryMetricsManager.java   | 35 ++++++++++++++++++----
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  5 ++--
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java |  2 +-
 .../db/mpp/plan/execution/QueryExecution.java      |  2 +-
 .../db/mpp/plan/parser/StatementGenerator.java     |  2 +-
 .../iotdb/db/mpp/plan/planner/LogicalPlanner.java  |  3 +-
 .../dag/input/TsBlockInputDataSet.java             |  6 ++--
 49 files changed, 190 insertions(+), 105 deletions(-)

diff --git a/metrics/interface/src/main/java/org/apache/iotdb/metrics/config/MetricConfig.java b/metrics/interface/src/main/java/org/apache/iotdb/metrics/config/MetricConfig.java
index b17410e12d..3d50eac41d 100644
--- a/metrics/interface/src/main/java/org/apache/iotdb/metrics/config/MetricConfig.java
+++ b/metrics/interface/src/main/java/org/apache/iotdb/metrics/config/MetricConfig.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.ReporterType;
 
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
@@ -34,7 +34,8 @@ public class MetricConfig {
   private MetricFrameType metricFrameType = MetricFrameType.MICROMETER;
 
   /** The list of reporters provide metrics for external tool */
-  private List<ReporterType> metricReporterList = Collections.singletonList(ReporterType.JMX);
+  private List<ReporterType> metricReporterList =
+      Arrays.asList(ReporterType.JMX, ReporterType.PROMETHEUS);
 
   /** The level of metric service */
   private MetricLevel metricLevel = MetricLevel.IMPORTANT;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index cc9b017328..1c0ab35d31 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -62,7 +62,9 @@ public enum Metric {
   THRIFT_ACTIVE_THREADS,
   IOT_CONSENSUS,
   STAGE,
-  QUERY_PLAN_COST;
+  QUERY_PLAN_COST,
+  OPERATOR_EXECUTION_COST,
+  OPERATOR_EXECUTION_COUNT;
 
   @Override
   public String toString() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index 086234fa12..92d8574294 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.execution.driver;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.collect.ImmutableList;
@@ -58,6 +60,8 @@ public abstract class Driver implements IDriver {
 
   protected final DriverLock exclusiveLock = new DriverLock();
 
+  protected final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
   protected enum State {
     ALIVE,
     NEED_DESTRUCTION,
@@ -182,8 +186,8 @@ public abstract class Driver implements IDriver {
       if (!blocked.isDone()) {
         return blocked;
       }
-      if (root.hasNext()) {
-        TsBlock tsBlock = root.next();
+      if (root.hasNextWithTimer()) {
+        TsBlock tsBlock = root.nextWithTimer();
         if (tsBlock != null && !tsBlock.isEmpty()) {
           sinkHandle.send(tsBlock);
         }
@@ -332,6 +336,17 @@ public abstract class Driver implements IDriver {
     try {
       root.close();
       sinkHandle.setNoMoreTsBlocks();
+
+      // record operator execution statistics to metrics
+      List<OperatorContext> operatorContexts =
+          driverContext.getFragmentInstanceContext().getOperatorContexts();
+      for (OperatorContext operatorContext : operatorContexts) {
+        String operatorType = operatorContext.getOperatorType();
+        QUERY_METRICS.recordOperatorExecutionCost(
+            operatorType, operatorContext.getTotalExecutionTimeInNanos());
+        QUERY_METRICS.recordOperatorExecutionCount(
+            operatorType, operatorContext.getNextCalledCount());
+      }
     } catch (InterruptedException t) {
       // don't record the stack
       wasInterrupted = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
index b7b05a9289..aaf404d467 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
@@ -38,9 +38,32 @@ public interface Operator extends AutoCloseable {
     return NOT_BLOCKED;
   }
 
+  default TsBlock nextWithTimer() {
+    OperatorContext context = getOperatorContext();
+    long startTime = System.nanoTime();
+
+    try {
+      return next();
+    } finally {
+      context.recordExecutionTime(System.nanoTime() - startTime);
+      context.recordNextCalled();
+    }
+  }
+
   /** Gets next tsBlock from this operator. If no data is currently available, return null. */
   TsBlock next();
 
+  default boolean hasNextWithTimer() {
+    OperatorContext context = getOperatorContext();
+    long startTime = System.nanoTime();
+
+    try {
+      return hasNext();
+    } finally {
+      context.recordExecutionTime(System.nanoTime() - startTime);
+    }
+  }
+
   /** @return true if the operator has more data, otherwise false */
   boolean hasNext();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
index 447066f4c8..3e9620a1b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
@@ -40,6 +40,9 @@ public class OperatorContext {
 
   private Duration maxRunTime;
 
+  private long totalExecutionTimeInNanos = 0L;
+  private long nextCalledCount = 0L;
+
   public OperatorContext(
       int operatorId,
       PlanNodeId planNodeId,
@@ -75,6 +78,22 @@ public class OperatorContext {
     return instanceContext.getSessionInfo();
   }
 
+  public void recordExecutionTime(long executionTimeInNanos) {
+    this.totalExecutionTimeInNanos += executionTimeInNanos;
+  }
+
+  public void recordNextCalled() {
+    this.nextCalledCount++;
+  }
+
+  public long getTotalExecutionTimeInNanos() {
+    return totalExecutionTimeInNanos;
+  }
+
+  public long getNextCalledCount() {
+    return nextCalledCount;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 5f4b8dda34..ad24a16ade 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -139,8 +139,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     }
     cachedTsBlock = null;
 
-    if (child.hasNext()) {
-      TsBlock inputTsBlock = child.next();
+    if (child.hasNextWithTimer()) {
+      TsBlock inputTsBlock = child.nextWithTimer();
       processTsBlock(inputTsBlock);
 
       // call child.next only once
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 0d7a6c89f7..aa86c6585a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -175,7 +175,7 @@ public class AggregationOperator implements ProcessOperator {
 
   @Override
   public boolean isFinished() {
-    return !this.hasNext();
+    return !this.hasNextWithTimer();
   }
 
   @Override
@@ -194,7 +194,7 @@ public class AggregationOperator implements ProcessOperator {
         return false;
       }
 
-      inputTsBlocks[i] = children.get(i).next();
+      inputTsBlocks[i] = children.get(i).nextWithTimer();
       canCallNext[i] = false;
       if (inputTsBlocks[i] == null) {
         return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index 511a75991b..c08895ceaa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -115,8 +115,8 @@ public class DeviceMergeOperator implements ProcessOperator {
   public TsBlock next() {
     // get new input TsBlock
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && deviceOperators.get(i).hasNext()) {
-        inputTsBlocks[i] = deviceOperators.get(i).next();
+      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && deviceOperators.get(i).hasNextWithTimer()) {
+        inputTsBlocks[i] = deviceOperators.get(i).nextWithTimer();
         if (inputTsBlocks[i] == null || inputTsBlocks[i].isEmpty()) {
           return null;
         }
@@ -204,7 +204,7 @@ public class DeviceMergeOperator implements ProcessOperator {
       if (!isTsBlockEmpty(i)) {
         return true;
       } else if (!noMoreTsBlocks[i]) {
-        if (deviceOperators.get(i).hasNext()) {
+        if (deviceOperators.get(i).hasNextWithTimer()) {
           return true;
         } else {
           noMoreTsBlocks[i] = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index 9a3a5e6634..242586a78e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -108,11 +108,11 @@ public class DeviceViewOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
-    if (!getCurDeviceOperator().hasNext()) {
+    if (!getCurDeviceOperator().hasNextWithTimer()) {
       deviceIndex++;
       return null;
     }
-    TsBlock tsBlock = getCurDeviceOperator().next();
+    TsBlock tsBlock = getCurDeviceOperator().nextWithTimer();
     if (tsBlock == null) {
       return null;
     }
@@ -151,7 +151,7 @@ public class DeviceViewOperator implements ProcessOperator {
 
   @Override
   public boolean isFinished() {
-    return !this.hasNext();
+    return !this.hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index c182168bd3..92ec771f32 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -58,7 +58,7 @@ public class FillOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
-    TsBlock block = child.next();
+    TsBlock block = child.nextWithTimer();
     if (block == null) {
       return null;
     }
@@ -79,7 +79,7 @@ public class FillOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() {
-    return child.hasNext();
+    return child.hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
index fcb03660a8..c08fcfd27d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
@@ -95,7 +95,7 @@ public class FilterAndProjectOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
-    TsBlock input = inputOperator.next();
+    TsBlock input = inputOperator.nextWithTimer();
     if (input == null) {
       return null;
     }
@@ -193,7 +193,7 @@ public class FilterAndProjectOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() {
-    return inputOperator.hasNext();
+    return inputOperator.hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index 726f5e7ecf..68604cd1ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -52,7 +52,7 @@ public class LimitOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
-    TsBlock block = child.next();
+    TsBlock block = child.nextWithTimer();
     if (block == null) {
       return null;
     }
@@ -68,7 +68,7 @@ public class LimitOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() {
-    return remainingLimit > 0 && child.hasNext();
+    return remainingLimit > 0 && child.hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index bcbb92a932..3792829196 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -87,7 +87,7 @@ public class LinearFillOperator implements ProcessOperator {
     // make sure we call child.next() at most once
     if (cachedTsBlock.isEmpty()) {
       canCallNext = false;
-      TsBlock nextTsBlock = child.next();
+      TsBlock nextTsBlock = child.nextWithTimer();
       // child operator's calculation is not finished, so we just return null
       if (nextTsBlock == null || nextTsBlock.isEmpty()) {
         return nextTsBlock;
@@ -144,7 +144,7 @@ public class LinearFillOperator implements ProcessOperator {
   @Override
   public boolean hasNext() {
     // if child.hasNext() return false, it means that there is no more tsBlocks
-    noMoreTsBlock = !child.hasNext();
+    noMoreTsBlock = !child.hasNextWithTimer();
     // if there is more tsBlock, we can call child.next() once
     canCallNext = !noMoreTsBlock;
     return !cachedTsBlock.isEmpty() || !noMoreTsBlock;
@@ -212,7 +212,7 @@ public class LinearFillOperator implements ProcessOperator {
     if (canCallNext) { // if we can call child.next(), we call that and cache it in
       // cachedTsBlock
       canCallNext = false;
-      TsBlock nextTsBlock = child.next();
+      TsBlock nextTsBlock = child.nextWithTimer();
       // child operator's calculation is not finished, so we just return null
       if (nextTsBlock == null || nextTsBlock.isEmpty()) {
         return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
index 372d01cb23..db11810fa2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
@@ -96,8 +96,8 @@ public class MergeSortOperator implements ProcessOperator {
   public TsBlock next() {
     // 1. fill consumed up TsBlock
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && inputOperators.get(i).hasNext()) {
-        inputTsBlocks[i] = inputOperators.get(i).next();
+      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && inputOperators.get(i).hasNextWithTimer()) {
+        inputTsBlocks[i] = inputOperators.get(i).nextWithTimer();
         if (inputTsBlocks[i] == null || inputTsBlocks[i].isEmpty()) {
           return null;
         }
@@ -160,7 +160,7 @@ public class MergeSortOperator implements ProcessOperator {
       if (!isTsBlockEmpty(i)) {
         return true;
       } else if (!noMoreTsBlocks[i]) {
-        if (inputOperators.get(i).hasNext()) {
+        if (inputOperators.get(i).hasNextWithTimer()) {
           return true;
         } else {
           noMoreTsBlocks[i] = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index 572738d081..e6f146eac1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -52,7 +52,7 @@ public class OffsetOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
-    TsBlock block = child.next();
+    TsBlock block = child.nextWithTimer();
     if (block == null) {
       return null;
     }
@@ -67,7 +67,7 @@ public class OffsetOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() {
-    return child.hasNext();
+    return child.hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index 30488c18d3..b5ce5ef823 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -58,7 +58,7 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
   }
 
   private boolean hasMoreData() {
-    return inputTsBlock != null || child.hasNext();
+    return inputTsBlock != null || child.hasNextWithTimer();
   }
 
   @Override
@@ -72,10 +72,10 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
       inputTsBlock = null;
 
       // NOTE: child.next() can only be invoked once
-      if (child.hasNext() && canCallNext) {
-        inputTsBlock = child.next();
+      if (child.hasNextWithTimer() && canCallNext) {
+        inputTsBlock = child.nextWithTimer();
         canCallNext = false;
-      } else if (child.hasNext()) {
+      } else if (child.hasNextWithTimer()) {
         // if child still has next but can't be invoked now
         return false;
       } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java
index 0107af8121..a22d52c9e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java
@@ -82,7 +82,7 @@ public class SingleDeviceViewOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
-    TsBlock tsBlock = deviceOperator.next();
+    TsBlock tsBlock = deviceOperator.nextWithTimer();
     if (tsBlock == null) {
       return null;
     }
@@ -104,7 +104,7 @@ public class SingleDeviceViewOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() {
-    return deviceOperator.hasNext();
+    return deviceOperator.hasNextWithTimer();
   }
 
   @Override
@@ -114,7 +114,7 @@ public class SingleDeviceViewOperator implements ProcessOperator {
 
   @Override
   public boolean isFinished() {
-    return !this.hasNext();
+    return !this.hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index 16071aea1e..8953755d5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -108,7 +108,7 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
 
   @Override
   public boolean isFinished() {
-    return !this.hasNext();
+    return !this.hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 76499849cb..805b183677 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -78,10 +78,10 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
     while (!isCalculationDone()) {
       if (inputTsBlock == null) {
         // NOTE: child.next() can only be invoked once
-        if (child.hasNext() && canCallNext) {
-          inputTsBlock = child.next();
+        if (child.hasNextWithTimer() && canCallNext) {
+          inputTsBlock = child.nextWithTimer();
           canCallNext = false;
-        } else if (child.hasNext()) {
+        } else if (child.hasNextWithTimer()) {
           // if child still has next but can't be invoked now
           return false;
         } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
index cf18c26b74..778e0d28c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
@@ -127,7 +127,7 @@ public class TagAggregationOperator implements ProcessOperator {
       }
 
       // If the data is unavailable first, try to find next tsblock of the child.
-      inputTsBlocks[i] = children.get(i).next();
+      inputTsBlocks[i] = children.get(i).nextWithTimer();
       consumedIndices[i] = 0;
       canCallNext[i] = false;
 
@@ -186,7 +186,7 @@ public class TagAggregationOperator implements ProcessOperator {
   @Override
   public boolean hasNext() {
     for (int i = 0; i < children.size(); i++) {
-      if (dataUnavailable(i) && !children.get(i).hasNext()) {
+      if (dataUnavailable(i) && !children.get(i).hasNextWithTimer()) {
         return false;
       }
     }
@@ -195,7 +195,7 @@ public class TagAggregationOperator implements ProcessOperator {
 
   @Override
   public boolean isFinished() {
-    return !this.hasNext();
+    return !this.hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index 4346ed231e..2e80c93139 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -342,7 +342,7 @@ public class TransformOperator implements ProcessOperator {
   @Override
   public boolean isFinished() {
     // call hasNext first, or data of inputOperator could be missing
-    boolean flag = !hasNext();
+    boolean flag = !hasNextWithTimer();
     return timeHeap.isEmpty() && (flag || inputOperator.isFinished());
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 695893d400..1c4211d003 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -137,9 +137,9 @@ public class RowBasedTimeJoinOperator implements ProcessOperator {
     // among all the input TsBlock as the current output TsBlock's endTime.
     for (int i = 0; i < inputOperatorsCount; i++) {
       if (!noMoreTsBlocks[i] && empty(i)) {
-        if (children.get(i).hasNext()) {
+        if (children.get(i).hasNextWithTimer()) {
           inputIndex[i] = 0;
-          inputTsBlocks[i] = children.get(i).next();
+          inputTsBlocks[i] = children.get(i).nextWithTimer();
           if (!empty(i)) {
             updateTimeSelector(i);
           } else {
@@ -210,7 +210,7 @@ public class RowBasedTimeJoinOperator implements ProcessOperator {
       if (!empty(i)) {
         return true;
       } else if (!noMoreTsBlocks[i]) {
-        if (children.get(i).hasNext()) {
+        if (children.get(i).hasNextWithTimer()) {
           return true;
         } else {
           noMoreTsBlocks[i] = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
index 3170838986..18bcd8836a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
@@ -137,9 +137,9 @@ public class TimeJoinOperator implements ProcessOperator {
     // among all the input TsBlock as the current output TsBlock's endTime.
     for (int i = 0; i < inputOperatorsCount; i++) {
       if (!noMoreTsBlocks[i] && empty(i)) {
-        if (children.get(i).hasNext()) {
+        if (children.get(i).hasNextWithTimer()) {
           inputIndex[i] = 0;
-          inputTsBlocks[i] = children.get(i).next();
+          inputTsBlocks[i] = children.get(i).nextWithTimer();
           if (!empty(i)) {
             int rowSize = inputTsBlocks[i].getPositionCount();
             for (int row = 0; row < rowSize; row++) {
@@ -209,7 +209,7 @@ public class TimeJoinOperator implements ProcessOperator {
       if (!empty(i)) {
         return true;
       } else if (!noMoreTsBlocks[i]) {
-        if (children.get(i).hasNext()) {
+        if (children.get(i).hasNextWithTimer()) {
           return true;
         } else {
           noMoreTsBlocks[i] = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/VerticallyConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/VerticallyConcatOperator.java
index 2a4ffd58c5..1fe4390082 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/VerticallyConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/VerticallyConcatOperator.java
@@ -99,7 +99,7 @@ public class VerticallyConcatOperator implements ProcessOperator {
     for (int i = 0; i < inputOperatorsCount; i++) {
       if (empty(i)) {
         inputIndex[i] = 0;
-        inputTsBlocks[i] = children.get(i).next();
+        inputTsBlocks[i] = children.get(i).nextWithTimer();
         if (empty(i)) {
           // child operator has not prepared TsBlock well
           return null;
@@ -144,7 +144,7 @@ public class VerticallyConcatOperator implements ProcessOperator {
     if (finished) {
       return false;
     }
-    return !empty(0) || children.get(0).hasNext();
+    return !empty(0) || children.get(0).hasNextWithTimer();
   }
 
   @Override
@@ -159,7 +159,7 @@ public class VerticallyConcatOperator implements ProcessOperator {
     if (finished) {
       return true;
     }
-    return finished = empty(0) && !children.get(0).hasNext();
+    return finished = empty(0) && !children.get(0).hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
index 4850fb45c4..3c182e0668 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
@@ -82,7 +82,7 @@ public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator
 
   @Override
   public boolean hasNext() {
-    return child.hasNext();
+    return child.hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
index 14354cbc3b..f78c19e4fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
@@ -51,7 +51,7 @@ public class AlignedUpdateLastCacheOperator extends AbstractUpdateLastCacheOpera
 
   @Override
   public TsBlock next() {
-    TsBlock res = child.next();
+    TsBlock res = child.nextWithTimer();
     if (res == null) {
       return null;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
index b91e346372..b69ecfa388 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
@@ -61,8 +61,8 @@ public class LastQueryCollectOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
-    if (children.get(currentIndex).hasNext()) {
-      return children.get(currentIndex).next();
+    if (children.get(currentIndex).hasNextWithTimer()) {
+      return children.get(currentIndex).nextWithTimer();
     } else {
       currentIndex++;
       return null;
@@ -83,7 +83,7 @@ public class LastQueryCollectOperator implements ProcessOperator {
 
   @Override
   public boolean isFinished() {
-    return !hasNext();
+    return !hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index 5ad04bdc2c..fcc074b024 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -116,9 +116,9 @@ public class LastQueryMergeOperator implements ProcessOperator {
     // among all the input TsBlock as the current output TsBlock's endTimeSeries.
     for (int i = 0; i < inputOperatorsCount; i++) {
       if (!noMoreTsBlocks[i] && empty(i)) {
-        if (children.get(i).hasNext()) {
+        if (children.get(i).hasNextWithTimer()) {
           inputIndex[i] = 0;
-          inputTsBlocks[i] = children.get(i).next();
+          inputTsBlocks[i] = children.get(i).nextWithTimer();
           if (!empty(i)) {
             int rowSize = inputTsBlocks[i].getPositionCount();
             for (int row = 0; row < rowSize; row++) {
@@ -187,7 +187,7 @@ public class LastQueryMergeOperator implements ProcessOperator {
       if (!empty(i)) {
         return true;
       } else if (!noMoreTsBlocks[i]) {
-        if (children.get(i).hasNext()) {
+        if (children.get(i).hasNextWithTimer()) {
           return true;
         } else {
           noMoreTsBlocks[i] = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index 7a689d4aa0..c0f825e02a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -104,8 +104,8 @@ public class LastQueryOperator implements ProcessOperator {
     while ((System.nanoTime() - start < maxRuntime)
         && (currentIndex < endIndex)
         && !tsBlockBuilder.isFull()) {
-      if (children.get(currentIndex).hasNext()) {
-        TsBlock tsBlock = children.get(currentIndex).next();
+      if (children.get(currentIndex).hasNextWithTimer()) {
+        TsBlock tsBlock = children.get(currentIndex).nextWithTimer();
         if (tsBlock == null) {
           return null;
         } else if (!tsBlock.isEmpty()) {
@@ -127,7 +127,7 @@ public class LastQueryOperator implements ProcessOperator {
 
   @Override
   public boolean isFinished() {
-    return !hasNext();
+    return !hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index aeb38b6b7d..3e16a8e6d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -148,8 +148,8 @@ public class LastQuerySortOperator implements ProcessOperator {
           previousTsBlock = null;
         }
       } else {
-        if (children.get(currentIndex).hasNext()) {
-          TsBlock tsBlock = children.get(currentIndex).next();
+        if (children.get(currentIndex).hasNextWithTimer()) {
+          TsBlock tsBlock = children.get(currentIndex).nextWithTimer();
           if (tsBlock == null) {
             return null;
           } else if (!tsBlock.isEmpty()) {
@@ -189,7 +189,7 @@ public class LastQuerySortOperator implements ProcessOperator {
 
   @Override
   public boolean isFinished() {
-    return !hasNext();
+    return !hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 7315c81f46..69972a40e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -53,7 +53,7 @@ public class UpdateLastCacheOperator extends AbstractUpdateLastCacheOperator {
 
   @Override
   public TsBlock next() {
-    TsBlock res = child.next();
+    TsBlock res = child.nextWithTimer();
     if (res == null) {
       return null;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
index 61a8758770..32f215716e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
@@ -96,8 +96,8 @@ public class CountMergeOperator implements ProcessOperator {
       if (childrenTsBlocks[i] == null) {
         // when this operator is not blocked, it means all children that have not return TsBlock is
         // not blocked.
-        if (children.get(i).hasNext()) {
-          TsBlock tsBlock = children.get(i).next();
+        if (children.get(i).hasNextWithTimer()) {
+          TsBlock tsBlock = children.get(i).nextWithTimer();
           if (tsBlock == null || tsBlock.isEmpty()) {
             allChildrenReady = false;
           } else {
@@ -164,7 +164,7 @@ public class CountMergeOperator implements ProcessOperator {
 
   @Override
   public boolean isFinished() {
-    return !hasNext();
+    return !hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
index 9a4f471eda..ab33978776 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
@@ -140,7 +140,7 @@ public class LevelTimeSeriesCountOperator implements SourceOperator {
 
   @Override
   public boolean isFinished() {
-    return !hasNext();
+    return !hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
index 781ba46695..d9315fcdbc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
@@ -79,7 +79,7 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
       isReadingMemory = false;
       return transferToTsBlock(data);
     } else {
-      TsBlock block = child.next();
+      TsBlock block = child.nextWithTimer();
       if (block == null) {
         return null;
       }
@@ -126,7 +126,7 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() {
-    return isReadingMemory || child.hasNext();
+    return isReadingMemory || child.hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
index 217a40bd17..4d04a62e86 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
@@ -70,7 +70,7 @@ public class NodePathsConvertOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
-    TsBlock block = child.next();
+    TsBlock block = child.nextWithTimer();
     if (block == null || block.isEmpty()) {
       return null;
     }
@@ -95,7 +95,7 @@ public class NodePathsConvertOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() {
-    return child.hasNext();
+    return child.hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
index cf026777fa..3a662afe13 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
@@ -71,8 +71,8 @@ public class NodePathsCountOperator implements ProcessOperator {
       if (!blocked.isDone()) {
         return null;
       }
-      if (child.hasNext()) {
-        TsBlock tsBlock = child.next();
+      if (child.hasNextWithTimer()) {
+        TsBlock tsBlock = child.nextWithTimer();
         if (null != tsBlock && !tsBlock.isEmpty()) {
           for (int i = 0; i < tsBlock.getPositionCount(); i++) {
             String path = tsBlock.getColumn(0).getBinary(i).toString();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
index 07a531e80c..977cd388df 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
@@ -72,8 +72,8 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
       return generateStorageGroupInfo();
     }
 
-    if (children.get(currentIndex).hasNext()) {
-      return children.get(currentIndex).next();
+    if (children.get(currentIndex).hasNextWithTimer()) {
+      return children.get(currentIndex).nextWithTimer();
     } else {
       currentIndex++;
       return null;
@@ -94,7 +94,7 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
 
   @Override
   public boolean isFinished() {
-    return !hasNext();
+    return !hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
index f2671034e8..0ad36268a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
@@ -51,8 +51,8 @@ public class SchemaQueryMergeOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
-    if (children.get(currentIndex).hasNext()) {
-      return children.get(currentIndex).next();
+    if (children.get(currentIndex).hasNextWithTimer()) {
+      return children.get(currentIndex).nextWithTimer();
     } else {
       currentIndex++;
       return null;
@@ -71,7 +71,7 @@ public class SchemaQueryMergeOperator implements ProcessOperator {
 
   @Override
   public boolean isFinished() {
-    return !hasNext();
+    return !hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
index 68419ec464..945961f828 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
@@ -87,8 +87,8 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator {
         if (operator.isFinished()) {
           noMoreTsBlocks[i] = true;
         } else {
-          if (operator.hasNext()) {
-            TsBlock tsBlock = operator.next();
+          if (operator.hasNextWithTimer()) {
+            TsBlock tsBlock = operator.nextWithTimer();
             if (null != tsBlock && !tsBlock.isEmpty()) {
               if (isShowTimeSeriesBlock(tsBlock)) {
                 showTimeSeriesResult.add(tsBlock);
@@ -203,7 +203,7 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator {
 
   @Override
   public boolean isFinished() {
-    return !hasNext();
+    return !hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
index 23bbcd4366..77bdc7b6cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
@@ -107,7 +107,7 @@ public abstract class SchemaQueryScanOperator implements SourceOperator {
 
   @Override
   public boolean isFinished() {
-    return !hasNext();
+    return !hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 15ad856245..ed6c92c199 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -168,7 +168,7 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
 
   @Override
   public boolean isFinished() {
-    return finished || (finished = !hasNext());
+    return finished || (finished = !hasNextWithTimer());
   }
 
   protected void calculateNextAggregationResult() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
index 974758f8a5..6c5185fbf2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
@@ -54,7 +54,7 @@ public class LastCacheScanOperator implements SourceOperator {
 
   @Override
   public boolean isFinished() {
-    return !hasNext();
+    return !hasNextWithTimer();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
index 8a4e878733..dac48b1680 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
@@ -22,18 +22,41 @@ package org.apache.iotdb.db.mpp.metric;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
-import org.apache.iotdb.metrics.type.Timer;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 
+import java.util.concurrent.TimeUnit;
+
 public class QueryMetricsManager {
 
   private final MetricService metricService = MetricService.getInstance();
 
-  public void addPlanCost(String stage, long costTimeInNanos) {
-    Timer timer =
-        metricService.getOrCreateTimer(
-            Metric.QUERY_PLAN_COST.toString(), MetricLevel.IMPORTANT, Tag.STAGE.toString(), stage);
-    timer.updateNanos(costTimeInNanos);
+  public void recordPlanCost(String stage, long costTimeInNanos) {
+    metricService.timer(
+        costTimeInNanos,
+        TimeUnit.NANOSECONDS,
+        Metric.QUERY_PLAN_COST.toString(),
+        MetricLevel.IMPORTANT,
+        Tag.STAGE.toString(),
+        stage);
+  }
+
+  public void recordOperatorExecutionCost(String operatorType, long costTimeInNanos) {
+    metricService.timer(
+        costTimeInNanos,
+        TimeUnit.NANOSECONDS,
+        Metric.OPERATOR_EXECUTION_COST.toString(),
+        MetricLevel.IMPORTANT,
+        Tag.NAME.toString(),
+        operatorType);
+  }
+
+  public void recordOperatorExecutionCount(String operatorType, long count) {
+    metricService.count(
+        count,
+        Metric.OPERATOR_EXECUTION_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        Tag.NAME.toString(),
+        operatorType);
   }
 
   public static QueryMetricsManager getInstance() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 8b582fb05a..00d63837ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -211,7 +211,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       } else {
         schemaTree = schemaFetcher.fetchSchema(patternTree);
       }
-      QueryMetricsManager.getInstance().addPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime);
+      QueryMetricsManager.getInstance()
+          .recordPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime);
       logger.debug("[EndFetchSchema]");
 
       // If there is no leaf node in the schema tree, the query should be completed immediately
@@ -1175,7 +1176,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       }
     } finally {
       QueryMetricsManager.getInstance()
-          .addPlanCost(PARTITION_FETCHER, System.nanoTime() - startTime);
+          .recordPlanCost(PARTITION_FETCHER, System.nanoTime() - startTime);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 679a71627f..54d6bc0e84 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -47,7 +47,7 @@ public class Analyzer {
         new AnalyzeVisitor(partitionFetcher, schemaFetcher).process(statement, context);
 
     if (statement.isQuery()) {
-      QueryMetricsManager.getInstance().addPlanCost(ANALYZER, System.nanoTime() - startTime);
+      QueryMetricsManager.getInstance().recordPlanCost(ANALYZER, System.nanoTime() - startTime);
     }
     return analysis;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index c150183527..23e8c75029 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -308,7 +308,7 @@ public class QueryExecution implements IQueryExecution {
 
     if (rawStatement.isQuery()) {
       QueryMetricsManager.getInstance()
-          .addPlanCost(DISTRIBUTION_PLANNER, System.nanoTime() - startTime);
+          .recordPlanCost(DISTRIBUTION_PLANNER, System.nanoTime() - startTime);
     }
     if (isQuery() && logger.isDebugEnabled()) {
       logger.debug(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 604442462e..f69362f66a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -520,7 +520,7 @@ public class StatementGenerator {
       }
       return astVisitor.visit(tree);
     } finally {
-      QueryMetricsManager.getInstance().addPlanCost(SQL_PARSER, System.nanoTime() - startTime);
+      QueryMetricsManager.getInstance().recordPlanCost(SQL_PARSER, System.nanoTime() - startTime);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index cc25e475ac..47d9b63e10 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -46,7 +46,8 @@ public class LogicalPlanner {
 
     // optimize the query logical plan
     if (analysis.getStatement().isQuery()) {
-      QueryMetricsManager.getInstance().addPlanCost(LOGICAL_PLANNER, System.nanoTime() - startTime);
+      QueryMetricsManager.getInstance()
+          .recordPlanCost(LOGICAL_PLANNER, System.nanoTime() - startTime);
 
       for (PlanOptimizer optimizer : optimizers) {
         rootNode = optimizer.optimize(rootNode, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/TsBlockInputDataSet.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/TsBlockInputDataSet.java
index b7790a2953..8f6629aba3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/TsBlockInputDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/TsBlockInputDataSet.java
@@ -55,10 +55,10 @@ public class TsBlockInputDataSet implements IUDFInputDataSet {
       if (operator.isBlocked() != Operator.NOT_BLOCKED) {
         return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
       }
-      if (!operator.hasNext()) {
+      if (!operator.hasNextWithTimer()) {
         return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
       }
-      final TsBlock tsBlock = operator.next();
+      final TsBlock tsBlock = operator.nextWithTimer();
       if (tsBlock == null) {
         return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
       }
@@ -72,7 +72,7 @@ public class TsBlockInputDataSet implements IUDFInputDataSet {
       if (operator.isBlocked() != Operator.NOT_BLOCKED) {
         return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
       }
-      return operator.hasNext()
+      return operator.hasNextWithTimer()
           ? YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA
           : YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
     }