You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/20 01:15:47 UTC

[iotdb] 01/05: add more metrics related to query execution

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/addQueryMetrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 34f2712aada80892248f14919f7eaaa5bd753ecb
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Dec 19 10:54:05 2022 +0800

    add more metrics related to query execution
---
 .../iotdb/commons/service/metric/enums/Metric.java |   5 +-
 .../iotdb/db/mpp/aggregation/Aggregator.java       |  84 ++++++++------
 .../iotdb/db/mpp/execution/driver/DataDriver.java  |  39 ++++---
 .../iotdb/db/mpp/execution/driver/Driver.java      |   5 +
 .../fragment/FragmentInstanceManager.java          |   8 +-
 .../db/mpp/metric/QueryExecutionMetricSet.java     | 124 +++++++++++++++++++++
 .../iotdb/db/mpp/metric/QueryMetricsManager.java   |  10 ++
 .../db/mpp/plan/execution/QueryExecution.java      |  22 +++-
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |   8 ++
 .../scheduler/FragmentInstanceDispatcherImpl.java  |   7 ++
 10 files changed, 258 insertions(+), 54 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index e025895606..3d70034cd6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -65,7 +65,10 @@ public enum Metric {
   QUERY_PLAN_COST,
   OPERATOR_EXECUTION_COST,
   OPERATOR_EXECUTION_COUNT,
-  SERIES_SCAN_COST;
+  SERIES_SCAN_COST,
+  DISPATCHER,
+  QUERY_EXECUTION,
+  AGGREGATION;
 
   @Override
   public String toString() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
index 9edc32bfa4..16120a1929 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.aggregation;
 
 import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindow;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -34,6 +35,8 @@ import java.util.Collections;
 import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS;
 
 public class Aggregator {
 
@@ -44,6 +47,8 @@ public class Aggregator {
 
   protected IWindow curWindow;
 
+  protected final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
   // Used for SeriesAggregateScanOperator
   public Aggregator(Accumulator accumulator, AggregationStep step) {
     this.accumulator = accumulator;
@@ -62,43 +67,53 @@ public class Aggregator {
 
   // Used for SeriesAggregateScanOperator and RawDataAggregateOperator
   public int processTsBlock(TsBlock tsBlock) {
-    checkArgument(
-        step.isInputRaw(),
-        "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input");
-    int lastReadReadIndex = 0;
-    for (InputLocation[] inputLocations : inputLocationList) {
+    long startTime = System.nanoTime();
+    try {
       checkArgument(
-          inputLocations[0].getTsBlockIndex() == 0,
-          "RawDataAggregateOperator can only process one tsBlock input.");
-      Column[] controlTimeAndValueColumn = new Column[3];
-      controlTimeAndValueColumn[0] = curWindow.getControlColumn(tsBlock);
-      controlTimeAndValueColumn[1] = tsBlock.getTimeColumn();
-      controlTimeAndValueColumn[2] = tsBlock.getColumn(inputLocations[0].getValueColumnIndex());
-      lastReadReadIndex =
-          Math.max(lastReadReadIndex, accumulator.addInput(controlTimeAndValueColumn, curWindow));
+          step.isInputRaw(),
+          "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input");
+      int lastReadReadIndex = 0;
+      for (InputLocation[] inputLocations : inputLocationList) {
+        checkArgument(
+            inputLocations[0].getTsBlockIndex() == 0,
+            "RawDataAggregateOperator can only process one tsBlock input.");
+        Column[] controlTimeAndValueColumn = new Column[3];
+        controlTimeAndValueColumn[0] = curWindow.getControlColumn(tsBlock);
+        controlTimeAndValueColumn[1] = tsBlock.getTimeColumn();
+        controlTimeAndValueColumn[2] = tsBlock.getColumn(inputLocations[0].getValueColumnIndex());
+        lastReadReadIndex =
+            Math.max(lastReadReadIndex, accumulator.addInput(controlTimeAndValueColumn, curWindow));
+      }
+      return lastReadReadIndex;
+    } finally {
+      QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
     }
-    return lastReadReadIndex;
   }
 
   // Used for AggregateOperator
   public void processTsBlocks(TsBlock[] tsBlock) {
-    checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input");
-    if (step.isInputFinal()) {
-      checkArgument(inputLocationList.size() == 1, "Final output can only be single column");
-      Column finalResult =
-          tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
-              inputLocationList.get(0)[0].getValueColumnIndex());
-      accumulator.setFinal(finalResult);
-    } else {
-      for (InputLocation[] inputLocations : inputLocationList) {
-        Column[] columns = new Column[inputLocations.length];
-        for (int i = 0; i < inputLocations.length; i++) {
-          columns[i] =
-              tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
-                  inputLocations[i].getValueColumnIndex());
+    long startTime = System.nanoTime();
+    try {
+      checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input");
+      if (step.isInputFinal()) {
+        checkArgument(inputLocationList.size() == 1, "Final output can only be single column");
+        Column finalResult =
+            tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
+                inputLocationList.get(0)[0].getValueColumnIndex());
+        accumulator.setFinal(finalResult);
+      } else {
+        for (InputLocation[] inputLocations : inputLocationList) {
+          Column[] columns = new Column[inputLocations.length];
+          for (int i = 0; i < inputLocations.length; i++) {
+            columns[i] =
+                tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
+                    inputLocations[i].getValueColumnIndex());
+          }
+          accumulator.addIntermediate(columns);
         }
-        accumulator.addIntermediate(columns);
       }
+    } finally {
+      QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
     }
   }
 
@@ -112,9 +127,14 @@ public class Aggregator {
 
   /** Used for SeriesAggregateScanOperator. */
   public void processStatistics(Statistics[] statistics) {
-    for (InputLocation[] inputLocations : inputLocationList) {
-      int valueIndex = inputLocations[0].getValueColumnIndex();
-      accumulator.addStatistics(statistics[valueIndex]);
+    long startTime = System.nanoTime();
+    try {
+      for (InputLocation[] inputLocations : inputLocationList) {
+        int valueIndex = inputLocations[0].getValueColumnIndex();
+        accumulator.addStatistics(statistics[valueIndex]);
+      }
+    } finally {
+      QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_STATISTICS, System.nanoTime() - startTime);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index 3fc0aaa042..79224545b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -40,6 +40,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.QUERY_RESOURCE_INIT;
+
 /**
  * One dataDriver is responsible for one FragmentInstance which is for data query, which may
  * contains several series.
@@ -97,23 +99,28 @@ public class DataDriver extends Driver {
    * we should change all the blocked lock operation into tryLock
    */
   private void initialize() throws QueryProcessException {
-    List<DataSourceOperator> sourceOperators =
-        ((DataDriverContext) driverContext).getSourceOperators();
-    if (sourceOperators != null && !sourceOperators.isEmpty()) {
-      QueryDataSource dataSource = initQueryDataSource();
-      sourceOperators.forEach(
-          sourceOperator -> {
-            // construct QueryDataSource for source operator
-            QueryDataSource queryDataSource =
-                new QueryDataSource(dataSource.getSeqResources(), dataSource.getUnseqResources());
-
-            queryDataSource.setDataTTL(dataSource.getDataTTL());
-
-            sourceOperator.initQueryDataSource(queryDataSource);
-          });
-    }
+    long startTime = System.nanoTime();
+    try {
+      List<DataSourceOperator> sourceOperators =
+          ((DataDriverContext) driverContext).getSourceOperators();
+      if (sourceOperators != null && !sourceOperators.isEmpty()) {
+        QueryDataSource dataSource = initQueryDataSource();
+        sourceOperators.forEach(
+            sourceOperator -> {
+              // construct QueryDataSource for source operator
+              QueryDataSource queryDataSource =
+                  new QueryDataSource(dataSource.getSeqResources(), dataSource.getUnseqResources());
+
+              queryDataSource.setDataTTL(dataSource.getDataTTL());
+
+              sourceOperator.initQueryDataSource(queryDataSource);
+            });
+      }
 
-    this.init = true;
+      this.init = true;
+    } finally {
+      QUERY_METRICS.recordExecutionCost(QUERY_RESOURCE_INIT, System.nanoTime() - startTime);
+    }
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index 92d8574294..a551174d67 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -47,6 +47,7 @@ import static com.google.common.base.Throwables.throwIfUnchecked;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static java.lang.Boolean.TRUE;
 import static org.apache.iotdb.db.mpp.execution.operator.Operator.NOT_BLOCKED;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.DRIVER_INTERNAL_PROCESS;
 
 public abstract class Driver implements IDriver {
 
@@ -177,6 +178,7 @@ public abstract class Driver implements IDriver {
   }
 
   private ListenableFuture<?> processInternal() {
+    long startTimeNanos = System.nanoTime();
     try {
       ListenableFuture<?> blocked = root.isBlocked();
       if (!blocked.isDone()) {
@@ -209,6 +211,9 @@ public abstract class Driver implements IDriver {
       newException.addSuppressed(t);
       driverContext.failed(newException);
       throw newException;
+    } finally {
+      QUERY_METRICS.recordExecutionCost(
+          DRIVER_INTERNAL_PROCESS, System.nanoTime() - startTimeNanos);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 5e37ac29aa..42a133ecb9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.execution.driver.DataDriver;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
 import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
 import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanner;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -47,6 +48,7 @@ import java.util.concurrent.TimeoutException;
 import static java.util.Objects.requireNonNull;
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceExecution.createFragmentInstanceExecution;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.LOCAL_EXECUTION_PLANNER;
 
 public class FragmentInstanceManager {
 
@@ -70,6 +72,8 @@ public class FragmentInstanceManager {
 
   private final ExecutorService intoOperationExecutor;
 
+  private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
   public static FragmentInstanceManager getInstance() {
     return FragmentInstanceManager.InstanceHolder.INSTANCE;
   }
@@ -101,7 +105,7 @@ public class FragmentInstanceManager {
 
   public FragmentInstanceInfo execDataQueryFragmentInstance(
       FragmentInstance instance, IDataRegionForQuery dataRegion) {
-
+    long startTime = System.nanoTime();
     FragmentInstanceId instanceId = instance.getId();
     try (SetThreadName fragmentInstanceName = new SetThreadName(instanceId.getFullId())) {
       FragmentInstanceExecution execution =
@@ -157,6 +161,8 @@ public class FragmentInstanceManager {
       } else {
         return createFailedInstanceInfo(instanceId);
       }
+    } finally {
+      QUERY_METRICS.recordExecutionCost(LOCAL_EXECUTION_PLANNER, System.nanoTime() - startTime);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryExecutionMetricSet.java b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryExecutionMetricSet.java
new file mode 100644
index 0000000000..0667bdc623
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryExecutionMetricSet.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricInfo;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class QueryExecutionMetricSet implements IMetricSet {
+
+  public static final Map<String, MetricInfo> metricInfoMap = new HashMap<>();
+
+  public static final String SCHEDULE = "schedule";
+  public static final String WAIT_FOR_DISPATCH = "wait_for_dispatch";
+  public static final String DISPATCH_READ = "dispatch_read";
+
+  static {
+    metricInfoMap.put(
+        SCHEDULE,
+        new MetricInfo(
+            MetricType.TIMER, Metric.DISPATCHER.toString(), Tag.STAGE.toString(), SCHEDULE));
+    metricInfoMap.put(
+        WAIT_FOR_DISPATCH,
+        new MetricInfo(
+            MetricType.TIMER,
+            Metric.DISPATCHER.toString(),
+            Tag.STAGE.toString(),
+            WAIT_FOR_DISPATCH));
+    metricInfoMap.put(
+        DISPATCH_READ,
+        new MetricInfo(
+            MetricType.TIMER, Metric.DISPATCHER.toString(), Tag.STAGE.toString(), DISPATCH_READ));
+  }
+
+  public static final String LOCAL_EXECUTION_PLANNER = "local_execution_planner";
+  public static final String QUERY_RESOURCE_INIT = "query_resource_init";
+  public static final String DRIVER_INTERNAL_PROCESS = "driver_internal_process";
+  public static final String WAIT_FOR_RESULT = "wait_for_result";
+
+  static {
+    metricInfoMap.put(
+        LOCAL_EXECUTION_PLANNER,
+        new MetricInfo(
+            MetricType.TIMER,
+            Metric.QUERY_EXECUTION.toString(),
+            Tag.STAGE.toString(),
+            LOCAL_EXECUTION_PLANNER));
+    metricInfoMap.put(
+        QUERY_RESOURCE_INIT,
+        new MetricInfo(
+            MetricType.TIMER,
+            Metric.QUERY_EXECUTION.toString(),
+            Tag.STAGE.toString(),
+            QUERY_RESOURCE_INIT));
+    metricInfoMap.put(
+        DRIVER_INTERNAL_PROCESS,
+        new MetricInfo(
+            MetricType.TIMER,
+            Metric.QUERY_EXECUTION.toString(),
+            Tag.STAGE.toString(),
+            DRIVER_INTERNAL_PROCESS));
+    metricInfoMap.put(
+        WAIT_FOR_RESULT,
+        new MetricInfo(
+            MetricType.TIMER,
+            Metric.QUERY_EXECUTION.toString(),
+            Tag.STAGE.toString(),
+            WAIT_FOR_RESULT));
+  }
+
+  public static final String AGGREGATION_FROM_RAW_DATA = "aggregation_from_raw_data";
+  public static final String AGGREGATION_FROM_STATISTICS = "aggregation_from_statistics";
+
+  static {
+    metricInfoMap.put(
+        AGGREGATION_FROM_RAW_DATA,
+        new MetricInfo(
+            MetricType.TIMER, Metric.AGGREGATION.toString(), Tag.FROM.toString(), "raw_data"));
+    metricInfoMap.put(
+        AGGREGATION_FROM_STATISTICS,
+        new MetricInfo(
+            MetricType.TIMER, Metric.AGGREGATION.toString(), Tag.FROM.toString(), "statistics"));
+  }
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    for (MetricInfo metricInfo : metricInfoMap.values()) {
+      metricService.getOrCreateTimer(
+          metricInfo.getName(), MetricLevel.IMPORTANT, metricInfo.getTagsInArray());
+    }
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    for (MetricInfo metricInfo : metricInfoMap.values()) {
+      metricService.getOrCreateTimer(
+          metricInfo.getName(), MetricLevel.IMPORTANT, metricInfo.getTagsInArray());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
index 9292af0885..0cf9bea940 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
@@ -70,6 +70,16 @@ public class QueryMetricsManager {
         metricInfo.getTagsInArray());
   }
 
+  public void recordExecutionCost(String stage, long costTimeInNanos) {
+    MetricInfo metricInfo = QueryExecutionMetricSet.metricInfoMap.get(stage);
+    metricService.timer(
+        costTimeInNanos,
+        TimeUnit.NANOSECONDS,
+        metricInfo.getName(),
+        MetricLevel.IMPORTANT,
+        metricInfo.getTagsInArray());
+  }
+
   public static QueryMetricsManager getInstance() {
     return QueryMetricsManager.QueryMetricsManagerHolder.INSTANCE;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index f621a4681d..c3890b3235 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -82,6 +82,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Throwables.throwIfUnchecked;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.SCHEDULE;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.WAIT_FOR_RESULT;
 import static org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet.DISTRIBUTION_PLANNER;
 import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
 
@@ -126,6 +128,8 @@ public class QueryExecution implements IQueryExecution {
 
   private AtomicBoolean stopped;
 
+  private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
   public QueryExecution(
       Statement statement,
       MPPQueryContext context,
@@ -268,7 +272,9 @@ public class QueryExecution implements IQueryExecution {
       this.scheduler.start();
       return;
     }
+
     // TODO: (xingtanzjr) initialize the query scheduler according to configuration
+    long startTime = System.nanoTime();
     this.scheduler =
         config.isClusterMode()
             ? new ClusterScheduler(
@@ -288,6 +294,9 @@ public class QueryExecution implements IQueryExecution {
                 scheduledExecutor,
                 internalServiceClientManager);
     this.scheduler.start();
+    if (rawStatement.isQuery()) {
+      QUERY_METRICS.recordExecutionCost(SCHEDULE, System.nanoTime() - startTime);
+    }
   }
 
   // Use LogicalPlanner to do the logical query plan and logical optimization
@@ -307,8 +316,7 @@ public class QueryExecution implements IQueryExecution {
     this.distributedPlan = planner.planFragments();
 
     if (rawStatement.isQuery()) {
-      QueryMetricsManager.getInstance()
-          .recordPlanCost(DISTRIBUTION_PLANNER, System.nanoTime() - startTime);
+      QUERY_METRICS.recordPlanCost(DISTRIBUTION_PLANNER, System.nanoTime() - startTime);
     }
     if (isQuery() && logger.isDebugEnabled()) {
       logger.debug(
@@ -405,8 +413,14 @@ public class QueryExecution implements IQueryExecution {
           return Optional.empty();
         }
 
-        ListenableFuture<?> blocked = resultHandle.isBlocked();
-        blocked.get();
+        long startTime = System.nanoTime();
+        try {
+          ListenableFuture<?> blocked = resultHandle.isBlocked();
+          blocked.get();
+        } finally {
+          QUERY_METRICS.recordExecutionCost(WAIT_FOR_RESULT, System.nanoTime() - startTime);
+        }
+
         if (!resultHandle.isFinished()) {
           // use the getSerializedTsBlock instead of receive to get ByteBuffer result
           T res = dataSupplier.get();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index fde66f115b..8193be03ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -41,6 +42,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.WAIT_FOR_DISPATCH;
+
 /**
  * QueryScheduler is used to dispatch the fragment instances of a query to target nodes. And it will
  * continue to collect and monitor the query execution before the query is finished.
@@ -61,6 +64,8 @@ public class ClusterScheduler implements IScheduler {
   private IFragInstanceStateTracker stateTracker;
   private IQueryTerminator queryTerminator;
 
+  private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
   public ClusterScheduler(
       MPPQueryContext queryContext,
       QueryStateMachine stateMachine,
@@ -103,6 +108,7 @@ public class ClusterScheduler implements IScheduler {
   @Override
   public void start() {
     stateMachine.transitionToDispatching();
+    long startTime = System.nanoTime();
     Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances);
 
     // NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect.
@@ -124,6 +130,8 @@ public class ClusterScheduler implements IScheduler {
       }
       stateMachine.transitionToFailed(e);
       return;
+    } finally {
+      QUERY_METRICS.recordExecutionCost(WAIT_FOR_DISPATCH, System.nanoTime() - startTime);
     }
 
     // For the FragmentInstance of WRITE, it will be executed directly when dispatching.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 1ba07ccb79..faecb60172 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.execution.executor.RegionExecutionResult;
 import org.apache.iotdb.db.mpp.execution.executor.RegionReadExecutor;
 import org.apache.iotdb.db.mpp.execution.executor.RegionWriteExecutor;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -53,6 +54,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.DISPATCH_READ;
 
 public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
 
@@ -67,6 +69,8 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       internalServiceClientManager;
 
+  private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
   public FragmentInstanceDispatcherImpl(
       QueryType type,
       MPPQueryContext queryContext,
@@ -98,6 +102,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
     return executor.submit(
         () -> {
           for (FragmentInstance instance : instances) {
+            long startTime = System.nanoTime();
             try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
               dispatchOneInstance(instance);
             } catch (FragmentInstanceDispatchException e) {
@@ -107,6 +112,8 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
               return new FragInstanceDispatchResult(
                   RpcUtils.getStatus(
                       TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()));
+            } finally {
+              QUERY_METRICS.recordExecutionCost(DISPATCH_READ, System.nanoTime() - startTime);
             }
           }
           return new FragInstanceDispatchResult(true);