You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/20 01:15:47 UTC
[iotdb] 01/05: add more metrics related to query execution
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/addQueryMetrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 34f2712aada80892248f14919f7eaaa5bd753ecb
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Dec 19 10:54:05 2022 +0800
add more metrics related to query execution
---
.../iotdb/commons/service/metric/enums/Metric.java | 5 +-
.../iotdb/db/mpp/aggregation/Aggregator.java | 84 ++++++++------
.../iotdb/db/mpp/execution/driver/DataDriver.java | 39 ++++---
.../iotdb/db/mpp/execution/driver/Driver.java | 5 +
.../fragment/FragmentInstanceManager.java | 8 +-
.../db/mpp/metric/QueryExecutionMetricSet.java | 124 +++++++++++++++++++++
.../iotdb/db/mpp/metric/QueryMetricsManager.java | 10 ++
.../db/mpp/plan/execution/QueryExecution.java | 22 +++-
.../db/mpp/plan/scheduler/ClusterScheduler.java | 8 ++
.../scheduler/FragmentInstanceDispatcherImpl.java | 7 ++
10 files changed, 258 insertions(+), 54 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index e025895606..3d70034cd6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -65,7 +65,10 @@ public enum Metric {
QUERY_PLAN_COST,
OPERATOR_EXECUTION_COST,
OPERATOR_EXECUTION_COUNT,
- SERIES_SCAN_COST;
+ SERIES_SCAN_COST,
+ DISPATCHER,
+ QUERY_EXECUTION,
+ AGGREGATION;
@Override
public String toString() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
index 9edc32bfa4..16120a1929 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.aggregation;
import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindow;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -34,6 +35,8 @@ import java.util.Collections;
import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS;
public class Aggregator {
@@ -44,6 +47,8 @@ public class Aggregator {
protected IWindow curWindow;
+ protected final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
// Used for SeriesAggregateScanOperator
public Aggregator(Accumulator accumulator, AggregationStep step) {
this.accumulator = accumulator;
@@ -62,43 +67,53 @@ public class Aggregator {
// Used for SeriesAggregateScanOperator and RawDataAggregateOperator
public int processTsBlock(TsBlock tsBlock) {
- checkArgument(
- step.isInputRaw(),
- "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input");
- int lastReadReadIndex = 0;
- for (InputLocation[] inputLocations : inputLocationList) {
+ long startTime = System.nanoTime();
+ try {
checkArgument(
- inputLocations[0].getTsBlockIndex() == 0,
- "RawDataAggregateOperator can only process one tsBlock input.");
- Column[] controlTimeAndValueColumn = new Column[3];
- controlTimeAndValueColumn[0] = curWindow.getControlColumn(tsBlock);
- controlTimeAndValueColumn[1] = tsBlock.getTimeColumn();
- controlTimeAndValueColumn[2] = tsBlock.getColumn(inputLocations[0].getValueColumnIndex());
- lastReadReadIndex =
- Math.max(lastReadReadIndex, accumulator.addInput(controlTimeAndValueColumn, curWindow));
+ step.isInputRaw(),
+ "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input");
+ int lastReadReadIndex = 0;
+ for (InputLocation[] inputLocations : inputLocationList) {
+ checkArgument(
+ inputLocations[0].getTsBlockIndex() == 0,
+ "RawDataAggregateOperator can only process one tsBlock input.");
+ Column[] controlTimeAndValueColumn = new Column[3];
+ controlTimeAndValueColumn[0] = curWindow.getControlColumn(tsBlock);
+ controlTimeAndValueColumn[1] = tsBlock.getTimeColumn();
+ controlTimeAndValueColumn[2] = tsBlock.getColumn(inputLocations[0].getValueColumnIndex());
+ lastReadReadIndex =
+ Math.max(lastReadReadIndex, accumulator.addInput(controlTimeAndValueColumn, curWindow));
+ }
+ return lastReadReadIndex;
+ } finally {
+ QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
}
- return lastReadReadIndex;
}
// Used for AggregateOperator
public void processTsBlocks(TsBlock[] tsBlock) {
- checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input");
- if (step.isInputFinal()) {
- checkArgument(inputLocationList.size() == 1, "Final output can only be single column");
- Column finalResult =
- tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
- inputLocationList.get(0)[0].getValueColumnIndex());
- accumulator.setFinal(finalResult);
- } else {
- for (InputLocation[] inputLocations : inputLocationList) {
- Column[] columns = new Column[inputLocations.length];
- for (int i = 0; i < inputLocations.length; i++) {
- columns[i] =
- tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
- inputLocations[i].getValueColumnIndex());
+ long startTime = System.nanoTime();
+ try {
+ checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input");
+ if (step.isInputFinal()) {
+ checkArgument(inputLocationList.size() == 1, "Final output can only be single column");
+ Column finalResult =
+ tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
+ inputLocationList.get(0)[0].getValueColumnIndex());
+ accumulator.setFinal(finalResult);
+ } else {
+ for (InputLocation[] inputLocations : inputLocationList) {
+ Column[] columns = new Column[inputLocations.length];
+ for (int i = 0; i < inputLocations.length; i++) {
+ columns[i] =
+ tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
+ inputLocations[i].getValueColumnIndex());
+ }
+ accumulator.addIntermediate(columns);
}
- accumulator.addIntermediate(columns);
}
+ } finally {
+ QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
}
}
@@ -112,9 +127,14 @@ public class Aggregator {
/** Used for SeriesAggregateScanOperator. */
public void processStatistics(Statistics[] statistics) {
- for (InputLocation[] inputLocations : inputLocationList) {
- int valueIndex = inputLocations[0].getValueColumnIndex();
- accumulator.addStatistics(statistics[valueIndex]);
+ long startTime = System.nanoTime();
+ try {
+ for (InputLocation[] inputLocations : inputLocationList) {
+ int valueIndex = inputLocations[0].getValueColumnIndex();
+ accumulator.addStatistics(statistics[valueIndex]);
+ }
+ } finally {
+ QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_STATISTICS, System.nanoTime() - startTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index 3fc0aaa042..79224545b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -40,6 +40,8 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.QUERY_RESOURCE_INIT;
+
/**
* One dataDriver is responsible for one FragmentInstance which is for data query, which may
* contains several series.
@@ -97,23 +99,28 @@ public class DataDriver extends Driver {
* we should change all the blocked lock operation into tryLock
*/
private void initialize() throws QueryProcessException {
- List<DataSourceOperator> sourceOperators =
- ((DataDriverContext) driverContext).getSourceOperators();
- if (sourceOperators != null && !sourceOperators.isEmpty()) {
- QueryDataSource dataSource = initQueryDataSource();
- sourceOperators.forEach(
- sourceOperator -> {
- // construct QueryDataSource for source operator
- QueryDataSource queryDataSource =
- new QueryDataSource(dataSource.getSeqResources(), dataSource.getUnseqResources());
-
- queryDataSource.setDataTTL(dataSource.getDataTTL());
-
- sourceOperator.initQueryDataSource(queryDataSource);
- });
- }
+ long startTime = System.nanoTime();
+ try {
+ List<DataSourceOperator> sourceOperators =
+ ((DataDriverContext) driverContext).getSourceOperators();
+ if (sourceOperators != null && !sourceOperators.isEmpty()) {
+ QueryDataSource dataSource = initQueryDataSource();
+ sourceOperators.forEach(
+ sourceOperator -> {
+ // construct QueryDataSource for source operator
+ QueryDataSource queryDataSource =
+ new QueryDataSource(dataSource.getSeqResources(), dataSource.getUnseqResources());
+
+ queryDataSource.setDataTTL(dataSource.getDataTTL());
+
+ sourceOperator.initQueryDataSource(queryDataSource);
+ });
+ }
- this.init = true;
+ this.init = true;
+ } finally {
+ QUERY_METRICS.recordExecutionCost(QUERY_RESOURCE_INIT, System.nanoTime() - startTime);
+ }
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index 92d8574294..a551174d67 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -47,6 +47,7 @@ import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.lang.Boolean.TRUE;
import static org.apache.iotdb.db.mpp.execution.operator.Operator.NOT_BLOCKED;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.DRIVER_INTERNAL_PROCESS;
public abstract class Driver implements IDriver {
@@ -177,6 +178,7 @@ public abstract class Driver implements IDriver {
}
private ListenableFuture<?> processInternal() {
+ long startTimeNanos = System.nanoTime();
try {
ListenableFuture<?> blocked = root.isBlocked();
if (!blocked.isDone()) {
@@ -209,6 +211,9 @@ public abstract class Driver implements IDriver {
newException.addSuppressed(t);
driverContext.failed(newException);
throw newException;
+ } finally {
+ QUERY_METRICS.recordExecutionCost(
+ DRIVER_INTERNAL_PROCESS, System.nanoTime() - startTimeNanos);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 5e37ac29aa..42a133ecb9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.execution.driver.DataDriver;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.utils.SetThreadName;
@@ -47,6 +48,7 @@ import java.util.concurrent.TimeoutException;
import static java.util.Objects.requireNonNull;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceExecution.createFragmentInstanceExecution;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.LOCAL_EXECUTION_PLANNER;
public class FragmentInstanceManager {
@@ -70,6 +72,8 @@ public class FragmentInstanceManager {
private final ExecutorService intoOperationExecutor;
+ private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
public static FragmentInstanceManager getInstance() {
return FragmentInstanceManager.InstanceHolder.INSTANCE;
}
@@ -101,7 +105,7 @@ public class FragmentInstanceManager {
public FragmentInstanceInfo execDataQueryFragmentInstance(
FragmentInstance instance, IDataRegionForQuery dataRegion) {
-
+ long startTime = System.nanoTime();
FragmentInstanceId instanceId = instance.getId();
try (SetThreadName fragmentInstanceName = new SetThreadName(instanceId.getFullId())) {
FragmentInstanceExecution execution =
@@ -157,6 +161,8 @@ public class FragmentInstanceManager {
} else {
return createFailedInstanceInfo(instanceId);
}
+ } finally {
+ QUERY_METRICS.recordExecutionCost(LOCAL_EXECUTION_PLANNER, System.nanoTime() - startTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryExecutionMetricSet.java b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryExecutionMetricSet.java
new file mode 100644
index 0000000000..0667bdc623
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryExecutionMetricSet.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricInfo;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class QueryExecutionMetricSet implements IMetricSet {
+
+ public static final Map<String, MetricInfo> metricInfoMap = new HashMap<>();
+
+ public static final String SCHEDULE = "schedule";
+ public static final String WAIT_FOR_DISPATCH = "wait_for_dispatch";
+ public static final String DISPATCH_READ = "dispatch_read";
+
+ static {
+ metricInfoMap.put(
+ SCHEDULE,
+ new MetricInfo(
+ MetricType.TIMER, Metric.DISPATCHER.toString(), Tag.STAGE.toString(), SCHEDULE));
+ metricInfoMap.put(
+ WAIT_FOR_DISPATCH,
+ new MetricInfo(
+ MetricType.TIMER,
+ Metric.DISPATCHER.toString(),
+ Tag.STAGE.toString(),
+ WAIT_FOR_DISPATCH));
+ metricInfoMap.put(
+ DISPATCH_READ,
+ new MetricInfo(
+ MetricType.TIMER, Metric.DISPATCHER.toString(), Tag.STAGE.toString(), DISPATCH_READ));
+ }
+
+ public static final String LOCAL_EXECUTION_PLANNER = "local_execution_planner";
+ public static final String QUERY_RESOURCE_INIT = "query_resource_init";
+ public static final String DRIVER_INTERNAL_PROCESS = "driver_internal_process";
+ public static final String WAIT_FOR_RESULT = "wait_for_result";
+
+ static {
+ metricInfoMap.put(
+ LOCAL_EXECUTION_PLANNER,
+ new MetricInfo(
+ MetricType.TIMER,
+ Metric.QUERY_EXECUTION.toString(),
+ Tag.STAGE.toString(),
+ LOCAL_EXECUTION_PLANNER));
+ metricInfoMap.put(
+ QUERY_RESOURCE_INIT,
+ new MetricInfo(
+ MetricType.TIMER,
+ Metric.QUERY_EXECUTION.toString(),
+ Tag.STAGE.toString(),
+ QUERY_RESOURCE_INIT));
+ metricInfoMap.put(
+ DRIVER_INTERNAL_PROCESS,
+ new MetricInfo(
+ MetricType.TIMER,
+ Metric.QUERY_EXECUTION.toString(),
+ Tag.STAGE.toString(),
+ DRIVER_INTERNAL_PROCESS));
+ metricInfoMap.put(
+ WAIT_FOR_RESULT,
+ new MetricInfo(
+ MetricType.TIMER,
+ Metric.QUERY_EXECUTION.toString(),
+ Tag.STAGE.toString(),
+ WAIT_FOR_RESULT));
+ }
+
+ public static final String AGGREGATION_FROM_RAW_DATA = "aggregation_from_raw_data";
+ public static final String AGGREGATION_FROM_STATISTICS = "aggregation_from_statistics";
+
+ static {
+ metricInfoMap.put(
+ AGGREGATION_FROM_RAW_DATA,
+ new MetricInfo(
+ MetricType.TIMER, Metric.AGGREGATION.toString(), Tag.FROM.toString(), "raw_data"));
+ metricInfoMap.put(
+ AGGREGATION_FROM_STATISTICS,
+ new MetricInfo(
+ MetricType.TIMER, Metric.AGGREGATION.toString(), Tag.FROM.toString(), "statistics"));
+ }
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ for (MetricInfo metricInfo : metricInfoMap.values()) {
+ metricService.getOrCreateTimer(
+ metricInfo.getName(), MetricLevel.IMPORTANT, metricInfo.getTagsInArray());
+ }
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ for (MetricInfo metricInfo : metricInfoMap.values()) {
+ metricService.getOrCreateTimer(
+ metricInfo.getName(), MetricLevel.IMPORTANT, metricInfo.getTagsInArray());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
index 9292af0885..0cf9bea940 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
@@ -70,6 +70,16 @@ public class QueryMetricsManager {
metricInfo.getTagsInArray());
}
+ public void recordExecutionCost(String stage, long costTimeInNanos) {
+ MetricInfo metricInfo = QueryExecutionMetricSet.metricInfoMap.get(stage);
+ metricService.timer(
+ costTimeInNanos,
+ TimeUnit.NANOSECONDS,
+ metricInfo.getName(),
+ MetricLevel.IMPORTANT,
+ metricInfo.getTagsInArray());
+ }
+
public static QueryMetricsManager getInstance() {
return QueryMetricsManager.QueryMetricsManagerHolder.INSTANCE;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index f621a4681d..c3890b3235 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -82,6 +82,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.throwIfUnchecked;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.SCHEDULE;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.WAIT_FOR_RESULT;
import static org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet.DISTRIBUTION_PLANNER;
import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
@@ -126,6 +128,8 @@ public class QueryExecution implements IQueryExecution {
private AtomicBoolean stopped;
+ private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
public QueryExecution(
Statement statement,
MPPQueryContext context,
@@ -268,7 +272,9 @@ public class QueryExecution implements IQueryExecution {
this.scheduler.start();
return;
}
+
// TODO: (xingtanzjr) initialize the query scheduler according to configuration
+ long startTime = System.nanoTime();
this.scheduler =
config.isClusterMode()
? new ClusterScheduler(
@@ -288,6 +294,9 @@ public class QueryExecution implements IQueryExecution {
scheduledExecutor,
internalServiceClientManager);
this.scheduler.start();
+ if (rawStatement.isQuery()) {
+ QUERY_METRICS.recordExecutionCost(SCHEDULE, System.nanoTime() - startTime);
+ }
}
// Use LogicalPlanner to do the logical query plan and logical optimization
@@ -307,8 +316,7 @@ public class QueryExecution implements IQueryExecution {
this.distributedPlan = planner.planFragments();
if (rawStatement.isQuery()) {
- QueryMetricsManager.getInstance()
- .recordPlanCost(DISTRIBUTION_PLANNER, System.nanoTime() - startTime);
+ QUERY_METRICS.recordPlanCost(DISTRIBUTION_PLANNER, System.nanoTime() - startTime);
}
if (isQuery() && logger.isDebugEnabled()) {
logger.debug(
@@ -405,8 +413,14 @@ public class QueryExecution implements IQueryExecution {
return Optional.empty();
}
- ListenableFuture<?> blocked = resultHandle.isBlocked();
- blocked.get();
+ long startTime = System.nanoTime();
+ try {
+ ListenableFuture<?> blocked = resultHandle.isBlocked();
+ blocked.get();
+ } finally {
+ QUERY_METRICS.recordExecutionCost(WAIT_FOR_RESULT, System.nanoTime() - startTime);
+ }
+
if (!resultHandle.isFinished()) {
// use the getSerializedTsBlock instead of receive to get ByteBuffer result
T res = dataSupplier.get();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index fde66f115b..8193be03ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -41,6 +42,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.WAIT_FOR_DISPATCH;
+
/**
* QueryScheduler is used to dispatch the fragment instances of a query to target nodes. And it will
* continue to collect and monitor the query execution before the query is finished.
@@ -61,6 +64,8 @@ public class ClusterScheduler implements IScheduler {
private IFragInstanceStateTracker stateTracker;
private IQueryTerminator queryTerminator;
+ private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
public ClusterScheduler(
MPPQueryContext queryContext,
QueryStateMachine stateMachine,
@@ -103,6 +108,7 @@ public class ClusterScheduler implements IScheduler {
@Override
public void start() {
stateMachine.transitionToDispatching();
+ long startTime = System.nanoTime();
Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances);
// NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect.
@@ -124,6 +130,8 @@ public class ClusterScheduler implements IScheduler {
}
stateMachine.transitionToFailed(e);
return;
+ } finally {
+ QUERY_METRICS.recordExecutionCost(WAIT_FOR_DISPATCH, System.nanoTime() - startTime);
}
// For the FragmentInstance of WRITE, it will be executed directly when dispatching.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 1ba07ccb79..faecb60172 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.execution.executor.RegionExecutionResult;
import org.apache.iotdb.db.mpp.execution.executor.RegionReadExecutor;
import org.apache.iotdb.db.mpp.execution.executor.RegionWriteExecutor;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -53,6 +54,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.DISPATCH_READ;
public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
@@ -67,6 +69,8 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager;
+ private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
public FragmentInstanceDispatcherImpl(
QueryType type,
MPPQueryContext queryContext,
@@ -98,6 +102,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
return executor.submit(
() -> {
for (FragmentInstance instance : instances) {
+ long startTime = System.nanoTime();
try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
dispatchOneInstance(instance);
} catch (FragmentInstanceDispatchException e) {
@@ -107,6 +112,8 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
return new FragInstanceDispatchResult(
RpcUtils.getStatus(
TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()));
+ } finally {
+ QUERY_METRICS.recordExecutionCost(DISPATCH_READ, System.nanoTime() - startTime);
}
}
return new FragInstanceDispatchResult(true);