You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/04/11 15:19:56 UTC
[iotdb] 01/03: add stat
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/groupByTest
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 984b8d3778885a40765943d69b404ca59aedf3ae
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Apr 10 23:02:19 2023 +0800
add stat
---
.../apache/iotdb/commons/conf/IoTDBConstant.java | 1 +
.../iotdb/db/mpp/aggregation/Aggregator.java | 56 ++-
.../iotdb/db/mpp/execution/driver/DataDriver.java | 12 +-
.../iotdb/db/mpp/execution/driver/Driver.java | 24 +-
.../mpp/execution/exchange/SharedTsBlockQueue.java | 42 +-
.../execution/exchange/sink/LocalSinkChannel.java | 13 +
.../exchange/source/LocalSourceHandle.java | 15 +-
.../fragment/FragmentInstanceContext.java | 14 +
.../fragment/FragmentInstanceManager.java | 13 +-
.../db/mpp/execution/operator/OperatorContext.java | 7 +
.../AbstractSeriesAggregationScanOperator.java | 270 +++++++------
.../execution/operator/source/SeriesScanUtil.java | 8 +-
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 12 +-
.../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 5 +-
.../db/mpp/plan/execution/QueryExecution.java | 15 +-
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 12 +
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 6 +-
.../scheduler/FragmentInstanceDispatcherImpl.java | 5 +-
.../iotdb/db/mpp/statistics/QueryStatistics.java | 421 +++++++++++++++++++++
.../service/thrift/impl/ClientRPCServiceImpl.java | 38 +-
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 31 +-
21 files changed, 833 insertions(+), 187 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 37321694d2..7c339aad9f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -95,6 +95,7 @@ public class IoTDBConstant {
public static final String AUDIT_LOGGER_NAME = "IoTDB_AUDIT_LOGGER";
public static final String SLOW_SQL_LOGGER_NAME = "SLOW_SQL";
public static final String COMPACTION_LOGGER_NAME = "COMPACTION";
+ public static final String QUERY_STATISTICS_LOGGER_NAME = "QUERY_STATISTICS";
public static final String IOTDB_JMX_LOCAL = "iotdb.jmx.local";
public static final String IOTDB_JMX_PORT = "com.sun.management.jmxremote.port";
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
index e359ce884a..3842d261a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -33,8 +33,7 @@ import java.util.Collections;
import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
-import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.AGGREGATOR_PROCESS_TSBLOCK;
public class Aggregator {
@@ -43,8 +42,6 @@ public class Aggregator {
protected List<InputLocation[]> inputLocationList;
protected final AggregationStep step;
- protected final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
-
// Used for SeriesAggregateScanOperator
public Aggregator(Accumulator accumulator, AggregationStep step) {
this.accumulator = accumulator;
@@ -78,34 +75,30 @@ public class Aggregator {
accumulator.addInput(timeAndValueColumn, bitMap, lastIndex);
}
} finally {
- QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
+ QueryStatistics.getInstance()
+ .addCost(AGGREGATOR_PROCESS_TSBLOCK, System.nanoTime() - startTime);
}
}
// Used for AggregateOperator
public void processTsBlocks(TsBlock[] tsBlock) {
- long startTime = System.nanoTime();
- try {
- checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input");
- if (step.isInputFinal()) {
- checkArgument(inputLocationList.size() == 1, "Final output can only be single column");
- Column finalResult =
- tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
- inputLocationList.get(0)[0].getValueColumnIndex());
- accumulator.setFinal(finalResult);
- } else {
- for (InputLocation[] inputLocations : inputLocationList) {
- Column[] columns = new Column[inputLocations.length];
- for (int i = 0; i < inputLocations.length; i++) {
- columns[i] =
- tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
- inputLocations[i].getValueColumnIndex());
- }
- accumulator.addIntermediate(columns);
+ checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input");
+ if (step.isInputFinal()) {
+ checkArgument(inputLocationList.size() == 1, "Final output can only be single column");
+ Column finalResult =
+ tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
+ inputLocationList.get(0)[0].getValueColumnIndex());
+ accumulator.setFinal(finalResult);
+ } else {
+ for (InputLocation[] inputLocations : inputLocationList) {
+ Column[] columns = new Column[inputLocations.length];
+ for (int i = 0; i < inputLocations.length; i++) {
+ columns[i] =
+ tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
+ inputLocations[i].getValueColumnIndex());
}
+ accumulator.addIntermediate(columns);
}
- } finally {
- QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
}
}
@@ -119,14 +112,9 @@ public class Aggregator {
/** Used for SeriesAggregateScanOperator. */
public void processStatistics(Statistics[] statistics) {
- long startTime = System.nanoTime();
- try {
- for (InputLocation[] inputLocations : inputLocationList) {
- int valueIndex = inputLocations[0].getValueColumnIndex();
- accumulator.addStatistics(statistics[valueIndex]);
- }
- } finally {
- QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_STATISTICS, System.nanoTime() - startTime);
+ for (InputLocation[] inputLocations : inputLocationList) {
+ int valueIndex = inputLocations[0].getValueColumnIndex();
+ accumulator.addStatistics(statistics[valueIndex]);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index d7378a67b0..e5b106258f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import com.google.common.util.concurrent.SettableFuture;
@@ -30,6 +31,7 @@ import javax.annotation.concurrent.NotThreadSafe;
import java.util.List;
import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.QUERY_RESOURCE_INIT;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.INIT_SOURCE_OP;
/**
* One dataDriver is responsible for one FragmentInstance which is for data query, which may
@@ -77,6 +79,7 @@ public class DataDriver extends Driver {
// And it's safe for us to throw this exception here in such case.
throw new IllegalStateException("QueryDataSource should never be null!");
}
+ long start = System.nanoTime();
sourceOperators.forEach(
sourceOperator -> {
// construct QueryDataSource for source operator
@@ -87,11 +90,18 @@ public class DataDriver extends Driver {
sourceOperator.initQueryDataSource(queryDataSource);
});
+ driverContext
+ .getFragmentInstanceContext()
+ .addOperationTime(INIT_SOURCE_OP, System.nanoTime() - start);
}
this.init = true;
} finally {
- QUERY_METRICS.recordExecutionCost(QUERY_RESOURCE_INIT, System.nanoTime() - startTime);
+ long costTime = System.nanoTime() - startTime;
+ QUERY_METRICS.recordExecutionCost(QUERY_RESOURCE_INIT, costTime);
+ driverContext
+ .getFragmentInstanceContext()
+ .addOperationTime(QueryStatistics.QUERY_RESOURCE_INIT, costTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index c3be5e1bbf..074753bb60 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.collect.ImmutableList;
@@ -48,6 +49,9 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.lang.Boolean.TRUE;
import static org.apache.iotdb.db.mpp.execution.operator.Operator.NOT_BLOCKED;
import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.DRIVER_INTERNAL_PROCESS;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.DRIVER_CLOSE;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SEND_TSBLOCK;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SET_NO_MORE_TSBLOCK;
public abstract class Driver implements IDriver {
@@ -220,7 +224,11 @@ public abstract class Driver implements IDriver {
if (root.hasNextWithTimer()) {
TsBlock tsBlock = root.nextWithTimer();
if (tsBlock != null && !tsBlock.isEmpty()) {
+ long startTime = System.nanoTime();
sink.send(tsBlock);
+ driverContext
+ .getFragmentInstanceContext()
+ .addOperationTime(SEND_TSBLOCK, System.nanoTime() - startTime);
}
}
return NOT_BLOCKED;
@@ -241,8 +249,11 @@ public abstract class Driver implements IDriver {
driverContext.failed(newException);
throw newException;
} finally {
- QUERY_METRICS.recordExecutionCost(
- DRIVER_INTERNAL_PROCESS, System.nanoTime() - startTimeNanos);
+ long costTime = System.nanoTime() - startTimeNanos;
+ QUERY_METRICS.recordExecutionCost(DRIVER_INTERNAL_PROCESS, costTime);
+ driverContext
+ .getFragmentInstanceContext()
+ .addOperationTime(QueryStatistics.DRIVER_INTERNAL_PROCESS, costTime);
}
}
@@ -371,8 +382,17 @@ public abstract class Driver implements IDriver {
Throwable inFlightException = null;
try {
+ long startTime = System.nanoTime();
root.close();
+ long endTime = System.nanoTime();
+ driverContext
+ .getFragmentInstanceContext()
+ .addOperationTime(DRIVER_CLOSE, endTime - startTime);
+ startTime = endTime;
sink.setNoMoreTsBlocks();
+ driverContext
+ .getFragmentInstanceContext()
+ .addOperationTime(SET_NO_MORE_TSBLOCK, System.nanoTime() - startTime);
// record operator execution statistics to metrics
List<OperatorContext> operatorContexts = driverContext.getOperatorContexts();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 55b7462922..24ef8d8a08 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.sink.LocalSinkChannel;
import org.apache.iotdb.db.mpp.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -41,11 +42,17 @@ import java.util.Queue;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.FREE_MEM;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.NOTIFY_END;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.NOTIFY_NEW_TSBLOCK;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.RESERVE_MEMORY;
/** This is not thread safe class, the caller should ensure multi-threads safety. */
@NotThreadSafe
public class SharedTsBlockQueue {
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
private static final Logger LOGGER = LoggerFactory.getLogger(SharedTsBlockQueue.class);
private final TFragmentInstanceId localFragmentInstanceId;
@@ -156,7 +163,9 @@ public class SharedTsBlockQueue {
}
this.noMoreTsBlocks = noMoreTsBlocks;
if (!blocked.isDone()) {
+ long startTime = System.nanoTime();
blocked.set(null);
+ QUERY_STATISTICS.addCost(NOTIFY_END, System.nanoTime() - startTime);
}
if (this.sourceHandle != null) {
this.sourceHandle.checkAndInvokeOnFinished();
@@ -177,6 +186,7 @@ public class SharedTsBlockQueue {
if (sinkChannel != null) {
sinkChannel.checkAndInvokeOnFinished();
}
+ long startTime = System.nanoTime();
localMemoryManager
.getQueryPool()
.free(
@@ -184,6 +194,7 @@ public class SharedTsBlockQueue {
fullFragmentInstanceId,
localPlanNodeId,
tsBlock.getRetainedSizeInBytes());
+ QUERY_STATISTICS.addCost(FREE_MEM, System.nanoTime() - startTime);
bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) {
blocked = SettableFuture.create();
@@ -203,17 +214,24 @@ public class SharedTsBlockQueue {
Validate.notNull(tsBlock, "TsBlock cannot be null");
Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full");
- Pair<ListenableFuture<Void>, Boolean> pair =
- localMemoryManager
- .getQueryPool()
- .reserve(
- localFragmentInstanceId.getQueryId(),
- fullFragmentInstanceId,
- localPlanNodeId,
- tsBlock.getRetainedSizeInBytes(),
- maxBytesCanReserve);
- blockedOnMemory = pair.left;
- bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
+
+ long startTime = System.nanoTime();
+ Pair<ListenableFuture<Void>, Boolean> pair;
+ try {
+ pair =
+ localMemoryManager
+ .getQueryPool()
+ .reserve(
+ localFragmentInstanceId.getQueryId(),
+ fullFragmentInstanceId,
+ localPlanNodeId,
+ tsBlock.getRetainedSizeInBytes(),
+ maxBytesCanReserve);
+ blockedOnMemory = pair.left;
+ bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
+ } finally {
+ QUERY_STATISTICS.addCost(RESERVE_MEMORY, System.nanoTime() - startTime);
+ }
// reserve memory failed, we should wait until there is enough memory
if (!pair.right) {
@@ -230,7 +248,9 @@ public class SharedTsBlockQueue {
} else { // reserve memory succeeded, add the TsBlock directly
queue.add(tsBlock);
if (!blocked.isDone()) {
+ startTime = System.nanoTime();
blocked.set(null);
+ QUERY_STATISTICS.addCost(NOTIFY_NEW_TSBLOCK, System.nanoTime() - startTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
index afd043c9af..c35fbbf902 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.execution.exchange.sink;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkListener;
import org.apache.iotdb.db.mpp.execution.exchange.SharedTsBlockQueue;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -34,9 +35,14 @@ import java.util.Optional;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_LOCAL;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CHECK_AND_INVOKE_ON_FINISHED;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SINK_HANDLE_END_LISTENER;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SINK_HANDLE_FINISH_LISTENER;
public class LocalSinkChannel implements ISinkChannel {
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
private static final Logger LOGGER = LoggerFactory.getLogger(LocalSinkChannel.class);
private TFragmentInstanceId localFragmentInstanceId;
@@ -102,7 +108,9 @@ public class LocalSinkChannel implements ISinkChannel {
synchronized (queue) {
if (isFinished()) {
synchronized (this) {
+ long start = System.nanoTime();
sinkListener.onFinish(this);
+ QUERY_STATISTICS.addCost(SINK_HANDLE_FINISH_LISTENER, System.nanoTime() - start);
}
}
}
@@ -144,10 +152,15 @@ public class LocalSinkChannel implements ISinkChannel {
return;
}
queue.setNoMoreTsBlocks(true);
+ long startTime = System.nanoTime();
sinkListener.onEndOfBlocks(this);
+ QUERY_STATISTICS.addCost(SINK_HANDLE_END_LISTENER, System.nanoTime() - startTime);
}
}
+
+ long startTime = System.nanoTime();
checkAndInvokeOnFinished();
+ QUERY_STATISTICS.addCost(CHECK_AND_INVOKE_ON_FINISHED, System.nanoTime() - startTime);
LOGGER.debug("[EndSetNoMoreTsBlocksOnLocal]");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
index 7dc6ad2983..653aa505d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.mpp.execution.exchange.SharedTsBlockQueue;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -40,11 +41,15 @@ import static com.google.common.util.concurrent.Futures.nonCancellationPropagati
import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SOURCE_HANDLE_DESERIALIZE_TSBLOCK_LOCAL;
import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SOURCE_HANDLE_GET_TSBLOCK_LOCAL;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.LOCAL_SOURCE_HANDLE_GET_TSBLOCK;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.LOCAL_SOURCE_HANDLE_SER_TSBLOCK;
public class LocalSourceHandle implements ISourceHandle {
private static final Logger LOGGER = LoggerFactory.getLogger(LocalSourceHandle.class);
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
private TFragmentInstanceId localFragmentInstanceId;
private String localPlanNodeId;
private final SourceHandleListener sourceHandleListener;
@@ -121,8 +126,9 @@ public class LocalSourceHandle implements ISourceHandle {
checkAndInvokeOnFinished();
return tsBlock;
} finally {
- QUERY_METRICS.recordDataExchangeCost(
- SOURCE_HANDLE_GET_TSBLOCK_LOCAL, System.nanoTime() - startTime);
+ long costTime = System.nanoTime() - startTime;
+ QUERY_METRICS.recordDataExchangeCost(SOURCE_HANDLE_GET_TSBLOCK_LOCAL, costTime);
+ QUERY_STATISTICS.addCost(LOCAL_SOURCE_HANDLE_GET_TSBLOCK, costTime);
}
}
@@ -136,8 +142,9 @@ public class LocalSourceHandle implements ISourceHandle {
} catch (Exception e) {
throw new IoTDBException(e, TSStatusCode.TSBLOCK_SERIALIZE_ERROR.getStatusCode());
} finally {
- QUERY_METRICS.recordDataExchangeCost(
- SOURCE_HANDLE_DESERIALIZE_TSBLOCK_LOCAL, System.nanoTime() - startTime);
+ long costTime = System.nanoTime() - startTime;
+ QUERY_METRICS.recordDataExchangeCost(SOURCE_HANDLE_DESERIALIZE_TSBLOCK_LOCAL, costTime);
+ QUERY_STATISTICS.addCost(LOCAL_SOURCE_HANDLE_SER_TSBLOCK, costTime);
}
} else {
return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index f810bc1ddb..cd7cc1d436 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -45,6 +46,9 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.ADD_REFERENCE;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.QUERY_RESOURCE_LIST;
+
public class FragmentInstanceContext extends QueryContext {
private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceContext.class);
@@ -83,6 +87,8 @@ public class FragmentInstanceContext extends QueryContext {
// private final AtomicLong endFullGcCount = new AtomicLong(-1);
// private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
public static FragmentInstanceContext createFragmentInstanceContext(
FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
FragmentInstanceContext instanceContext =
@@ -279,6 +285,7 @@ public class FragmentInstanceContext extends QueryContext {
selectedDeviceIdSet.add(translatedPath.getDevice());
}
+ long startTime = System.nanoTime();
this.sharedQueryDataSource =
dataRegion.query(
pathList,
@@ -287,13 +294,16 @@ public class FragmentInstanceContext extends QueryContext {
selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
this,
timeFilter != null ? timeFilter.copy() : null);
+ addOperationTime(QUERY_RESOURCE_LIST, System.nanoTime() - startTime);
// used files should be added before mergeLock is unlocked, or they may be deleted by
// running merge
if (sharedQueryDataSource != null) {
closedFilePaths = new HashSet<>();
unClosedFilePaths = new HashSet<>();
+ startTime = System.nanoTime();
addUsedFilesForQuery(sharedQueryDataSource);
+ addOperationTime(ADD_REFERENCE, System.nanoTime() - startTime);
}
} finally {
dataRegion.readUnlock();
@@ -368,4 +378,8 @@ public class FragmentInstanceContext extends QueryContext {
sourcePaths = null;
sharedQueryDataSource = null;
}
+
+ public void addOperationTime(String key, long costTimeInNanos) {
+ QUERY_STATISTICS.addCost(key, costTimeInNanos);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 6e5390fdc1..366044edf3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.mpp.plan.planner.PipelineDriverFactory;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.utils.SetThreadName;
import io.airlift.stats.CounterStat;
@@ -52,6 +53,8 @@ import static java.util.Objects.requireNonNull;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceExecution.createFragmentInstanceExecution;
import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.LOCAL_EXECUTION_PLANNER;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CREATE_FI_CONTEXT;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CREATE_FI_EXEC;
public class FragmentInstanceManager {
@@ -76,6 +79,7 @@ public class FragmentInstanceManager {
private final ExecutorService intoOperationExecutor;
private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
public static FragmentInstanceManager getInstance() {
return FragmentInstanceManager.InstanceHolder.INSTANCE;
@@ -118,6 +122,7 @@ public class FragmentInstanceManager {
FragmentInstanceStateMachine stateMachine =
new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ long start = System.nanoTime();
FragmentInstanceContext context =
instanceContext.computeIfAbsent(
instanceId,
@@ -128,6 +133,7 @@ public class FragmentInstanceManager {
instance.getSessionInfo(),
dataRegion,
instance.getTimeFilter()));
+ QUERY_STATISTICS.addCost(CREATE_FI_CONTEXT, System.nanoTime() - start);
try {
List<PipelineDriverFactory> driverFactories =
@@ -141,6 +147,7 @@ public class FragmentInstanceManager {
// get the sink of last driver
ISink sink = drivers.get(drivers.size() - 1).getSink();
+ start = System.nanoTime();
return createFragmentInstanceExecution(
scheduler,
instanceId,
@@ -154,6 +161,8 @@ public class FragmentInstanceManager {
logger.warn("error when create FragmentInstanceExecution.", t);
stateMachine.failed(t);
return null;
+ } finally {
+ QUERY_STATISTICS.addCost(CREATE_FI_EXEC, System.nanoTime() - start);
}
});
@@ -171,7 +180,9 @@ public class FragmentInstanceManager {
return createFailedInstanceInfo(instanceId);
}
} finally {
- QUERY_METRICS.recordExecutionCost(LOCAL_EXECUTION_PLANNER, System.nanoTime() - startTime);
+ long endTime = System.nanoTime() - startTime;
+ QUERY_METRICS.recordExecutionCost(LOCAL_EXECUTION_PLANNER, endTime);
+ QUERY_STATISTICS.addCost(QueryStatistics.LOCAL_EXECUTION_PLANNER, endTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
index c5c0db783d..7a55772dce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import io.airlift.units.Duration;
@@ -45,6 +46,8 @@ public class OperatorContext {
private long totalExecutionTimeInNanos = 0L;
private long nextCalledCount = 0L;
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
public OperatorContext(
int operatorId, PlanNodeId planNodeId, String operatorType, DriverContext driverContext) {
this.operatorId = operatorId;
@@ -65,6 +68,10 @@ public class OperatorContext {
this.driverContext = new DriverContext(fragmentInstanceContext, 0);
}
+ public void addOperatorTime(String key, long costTimeInNanos) {
+ QUERY_STATISTICS.addCost(key, costTimeInNanos);
+ }
+
public int getOperatorId() {
return operatorId;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 67b42a4319..067c14787e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -22,8 +22,10 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -41,6 +43,14 @@ import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.AGG_SCAN_OPERATOR;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.BUILD_AGG_RES;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_CHUNK;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_FILE;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_PAGE;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_NEXT_AGG_RES;
public abstract class AbstractSeriesAggregationScanOperator extends AbstractDataSourceOperator {
@@ -122,28 +132,33 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData
// start stopwatch
long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
long start = System.nanoTime();
+ try {
+ while (System.nanoTime() - start < maxRuntime
+ && timeRangeIterator.hasNextTimeRange()
+ && !resultTsBlockBuilder.isFull()) {
+ // move to next time window
+ curTimeRange = timeRangeIterator.nextTimeRange();
+
+ // clear previous aggregation result
+ for (Aggregator aggregator : aggregators) {
+ aggregator.reset();
+ }
- while (System.nanoTime() - start < maxRuntime
- && timeRangeIterator.hasNextTimeRange()
- && !resultTsBlockBuilder.isFull()) {
- // move to next time window
- curTimeRange = timeRangeIterator.nextTimeRange();
-
- // clear previous aggregation result
- for (Aggregator aggregator : aggregators) {
- aggregator.reset();
+ long startTime = System.nanoTime();
+ // calculate aggregation result on current time window
+ calculateNextAggregationResult();
+ operatorContext.addOperatorTime(CAL_NEXT_AGG_RES, System.nanoTime() - startTime);
}
- // calculate aggregation result on current time window
- calculateNextAggregationResult();
- }
-
- if (resultTsBlockBuilder.getPositionCount() > 0) {
- TsBlock resultTsBlock = resultTsBlockBuilder.build();
- resultTsBlockBuilder.reset();
- return resultTsBlock;
- } else {
- return null;
+ if (resultTsBlockBuilder.getPositionCount() > 0) {
+ TsBlock resultTsBlock = resultTsBlockBuilder.build();
+ resultTsBlockBuilder.reset();
+ return resultTsBlock;
+ } else {
+ return null;
+ }
+ } finally {
+ operatorContext.addOperatorTime(AGG_SCAN_OPERATOR, System.nanoTime() - start);
}
}
@@ -184,8 +199,10 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData
}
protected void updateResultTsBlock() {
+ long startTime = System.nanoTime();
appendAggregationResult(
resultTsBlockBuilder, aggregators, timeRangeIterator.currentOutputTime());
+ operatorContext.addOperatorTime(BUILD_AGG_RES, System.nanoTime() - startTime);
}
protected boolean calcFromCachedData() {
@@ -193,53 +210,74 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData
}
private boolean calcFromRawData(TsBlock tsBlock) {
- Pair<Boolean, TsBlock> calcResult =
- calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, ascending);
- inputTsBlock = calcResult.getRight();
- return calcResult.getLeft();
+ long startTime = System.nanoTime();
+ try {
+ Pair<Boolean, TsBlock> calcResult =
+ calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, ascending);
+ inputTsBlock = calcResult.getRight();
+ return calcResult.getLeft();
+ } finally {
+ operatorContext.addOperatorTime(
+ QueryStatistics.CAL_AGG_FROM_RAW_DATA, System.nanoTime() - startTime);
+ QueryMetricsManager.getInstance()
+ .recordExecutionCost(AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
+ }
}
protected void calcFromStatistics(Statistics[] statistics) {
- for (Aggregator aggregator : aggregators) {
- if (aggregator.hasFinalResult()) {
- continue;
+ long startTime = System.nanoTime();
+ try {
+ for (Aggregator aggregator : aggregators) {
+ if (aggregator.hasFinalResult()) {
+ continue;
+ }
+ aggregator.processStatistics(statistics);
}
- aggregator.processStatistics(statistics);
+ } finally {
+ QueryMetricsManager.getInstance()
+ .recordExecutionCost(AGGREGATION_FROM_STATISTICS, System.nanoTime() - startTime);
+ operatorContext.addOperatorTime(
+ QueryStatistics.CAL_AGG_FROM_STAT, System.nanoTime() - startTime);
}
}
protected boolean readAndCalcFromFile() throws IOException {
while (seriesScanUtil.hasNextFile()) {
- if (canUseCurrentFileStatistics()) {
- Statistics fileTimeStatistics = seriesScanUtil.currentFileTimeStatistics();
- if (fileTimeStatistics.getStartTime() > curTimeRange.getMax()) {
- if (ascending) {
- return true;
- } else {
- seriesScanUtil.skipCurrentFile();
- continue;
+ long startTime = System.nanoTime();
+ try {
+ if (canUseCurrentFileStatistics()) {
+ Statistics fileTimeStatistics = seriesScanUtil.currentFileTimeStatistics();
+ if (fileTimeStatistics.getStartTime() > curTimeRange.getMax()) {
+ if (ascending) {
+ return true;
+ } else {
+ seriesScanUtil.skipCurrentFile();
+ continue;
+ }
}
- }
- // calc from fileMetaData
- if (curTimeRange.contains(
- fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) {
- Statistics[] statisticsList = new Statistics[subSensorSize];
- for (int i = 0; i < subSensorSize; i++) {
- statisticsList[i] = seriesScanUtil.currentFileStatistics(i);
- }
- calcFromStatistics(statisticsList);
- seriesScanUtil.skipCurrentFile();
- if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
- return true;
- } else {
- continue;
+ // calc from fileMetaData
+ if (curTimeRange.contains(
+ fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) {
+ Statistics[] statisticsList = new Statistics[subSensorSize];
+ for (int i = 0; i < subSensorSize; i++) {
+ statisticsList[i] = seriesScanUtil.currentFileStatistics(i);
+ }
+ calcFromStatistics(statisticsList);
+ seriesScanUtil.skipCurrentFile();
+ if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
+ return true;
+ } else {
+ continue;
+ }
}
}
- }
- // read chunk
- if (readAndCalcFromChunk()) {
- return true;
+ // read chunk
+ if (readAndCalcFromChunk()) {
+ return true;
+ }
+ } finally {
+ operatorContext.addOperatorTime(CAL_AGG_FROM_FILE, System.nanoTime() - startTime);
}
}
@@ -248,37 +286,42 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData
protected boolean readAndCalcFromChunk() throws IOException {
while (seriesScanUtil.hasNextChunk()) {
- if (canUseCurrentChunkStatistics()) {
- Statistics chunkTimeStatistics = seriesScanUtil.currentChunkTimeStatistics();
- if (chunkTimeStatistics.getStartTime() > curTimeRange.getMax()) {
- if (ascending) {
- return true;
- } else {
- seriesScanUtil.skipCurrentChunk();
- continue;
+ long startTime = System.nanoTime();
+ try {
+ if (canUseCurrentChunkStatistics()) {
+ Statistics chunkTimeStatistics = seriesScanUtil.currentChunkTimeStatistics();
+ if (chunkTimeStatistics.getStartTime() > curTimeRange.getMax()) {
+ if (ascending) {
+ return true;
+ } else {
+ seriesScanUtil.skipCurrentChunk();
+ continue;
+ }
}
- }
- // calc from chunkMetaData
- if (curTimeRange.contains(
- chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) {
// calc from chunkMetaData
- Statistics[] statisticsList = new Statistics[subSensorSize];
- for (int i = 0; i < subSensorSize; i++) {
- statisticsList[i] = seriesScanUtil.currentChunkStatistics(i);
- }
- calcFromStatistics(statisticsList);
- seriesScanUtil.skipCurrentChunk();
- if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
- return true;
- } else {
- continue;
+ if (curTimeRange.contains(
+ chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) {
+ // calc from chunkMetaData
+ Statistics[] statisticsList = new Statistics[subSensorSize];
+ for (int i = 0; i < subSensorSize; i++) {
+ statisticsList[i] = seriesScanUtil.currentChunkStatistics(i);
+ }
+ calcFromStatistics(statisticsList);
+ seriesScanUtil.skipCurrentChunk();
+ if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
+ return true;
+ } else {
+ continue;
+ }
}
}
- }
- // read page
- if (readAndCalcFromPage()) {
- return true;
+ // read page
+ if (readAndCalcFromPage()) {
+ return true;
+ }
+ } finally {
+ operatorContext.addOperatorTime(CAL_AGG_FROM_CHUNK, System.nanoTime() - startTime);
}
}
return false;
@@ -286,43 +329,48 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData
protected boolean readAndCalcFromPage() throws IOException {
while (seriesScanUtil.hasNextPage()) {
- if (canUseCurrentPageStatistics()) {
- Statistics pageTimeStatistics = seriesScanUtil.currentPageTimeStatistics();
- // There is no more eligible points in current time range
- if (pageTimeStatistics.getStartTime() > curTimeRange.getMax()) {
- if (ascending) {
- return true;
- } else {
- seriesScanUtil.skipCurrentPage();
- continue;
- }
- }
- // can use pageHeader
- if (curTimeRange.contains(
- pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) {
- Statistics[] statisticsList = new Statistics[subSensorSize];
- for (int i = 0; i < subSensorSize; i++) {
- statisticsList[i] = seriesScanUtil.currentPageStatistics(i);
+ long startTime = System.nanoTime();
+ try {
+ if (canUseCurrentPageStatistics()) {
+ Statistics pageTimeStatistics = seriesScanUtil.currentPageTimeStatistics();
+ // There is no more eligible points in current time range
+ if (pageTimeStatistics.getStartTime() > curTimeRange.getMax()) {
+ if (ascending) {
+ return true;
+ } else {
+ seriesScanUtil.skipCurrentPage();
+ continue;
+ }
}
- calcFromStatistics(statisticsList);
- seriesScanUtil.skipCurrentPage();
- if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
- return true;
- } else {
- continue;
+ // can use pageHeader
+ if (curTimeRange.contains(
+ pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) {
+ Statistics[] statisticsList = new Statistics[subSensorSize];
+ for (int i = 0; i < subSensorSize; i++) {
+ statisticsList[i] = seriesScanUtil.currentPageStatistics(i);
+ }
+ calcFromStatistics(statisticsList);
+ seriesScanUtil.skipCurrentPage();
+ if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
+ return true;
+ } else {
+ continue;
+ }
}
}
- }
- // calc from page data
- TsBlock tsBlock = seriesScanUtil.nextPage();
- if (tsBlock == null || tsBlock.isEmpty()) {
- continue;
- }
+ // calc from page data
+ TsBlock tsBlock = seriesScanUtil.nextPage();
+ if (tsBlock == null || tsBlock.isEmpty()) {
+ continue;
+ }
- // calc from raw data
- if (calcFromRawData(tsBlock)) {
- return true;
+ // calc from raw data
+ if (calcFromRawData(tsBlock)) {
+ return true;
+ }
+ } finally {
+ operatorContext.addOperatorTime(CAL_AGG_FROM_PAGE, System.nanoTime() - startTime);
}
}
return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index d42c2ff931..e9a63abaae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.chunk.MemAlignedPageReader;
import org.apache.iotdb.db.query.reader.chunk.MemPageReader;
@@ -66,6 +67,7 @@ import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLO
import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_PAGE_READER_ALIGNED_MEM;
import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_DISK;
import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_MEM;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.PAGE_READER;
public class SeriesScanUtil {
@@ -1120,7 +1122,7 @@ public class SeriesScanUtil {
return scanOptions.getGlobalTimeFilter();
}
- protected static class VersionPageReader {
+ protected class VersionPageReader {
private final PriorityMergeReader.MergeReaderPriority version;
private final IPageReader data;
@@ -1164,6 +1166,7 @@ public class SeriesScanUtil {
}
return tsBlock;
} finally {
+ long costTime = System.nanoTime() - startTime;
QUERY_METRICS.recordSeriesScanCost(
isAligned
? (isMem
@@ -1172,7 +1175,8 @@ public class SeriesScanUtil {
: (isMem
? BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_MEM
: BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_DISK),
- System.nanoTime() - startTime);
+ costTime);
+ QueryStatistics.getInstance().addCost(PAGE_READER, costTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 939417253c..1494da179a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -133,6 +133,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.ExplainStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkTypeStatement;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.TimePartitionUtils;
@@ -245,8 +246,10 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
} else {
schemaTree = schemaFetcher.fetchSchema(patternTree);
}
- QueryMetricsManager.getInstance()
- .recordPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime);
+ long endTime = System.nanoTime() - startTime;
+ QueryMetricsManager.getInstance().recordPlanCost(SCHEMA_FETCHER, endTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.SCHEMA_FETCHER, endTime);
+
logger.debug("[EndFetchSchema]");
// If there is no leaf node in the schema tree, the query should be completed immediately
@@ -1417,8 +1420,9 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
return partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
}
} finally {
- QueryMetricsManager.getInstance()
- .recordPlanCost(PARTITION_FETCHER, System.nanoTime() - startTime);
+ long costTime = System.nanoTime() - startTime;
+ QueryMetricsManager.getInstance().recordPlanCost(PARTITION_FETCHER, costTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.PARTITION_FETCHER, costTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index c9c71b85f6..e235048f9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import static org.apache.iotdb.db.mpp.common.QueryId.mockQueryId;
import static org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet.ANALYZER;
@@ -48,7 +49,9 @@ public class Analyzer {
new AnalyzeVisitor(partitionFetcher, schemaFetcher).process(statement, context);
if (statement.isQuery()) {
- QueryMetricsManager.getInstance().recordPlanCost(ANALYZER, System.nanoTime() - startTime);
+ long costTime = System.nanoTime() - startTime;
+ QueryMetricsManager.getInstance().recordPlanCost(ANALYZER, costTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.ANALYZER, costTime);
}
return analysis;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 4c2d0c23bf..6a030a1512 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -60,6 +60,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -133,6 +134,7 @@ public class QueryExecution implements IQueryExecution {
private long totalExecutionTime;
private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
PerformanceOverviewMetrics.getInstance();
@@ -312,7 +314,10 @@ public class QueryExecution implements IQueryExecution {
syncInternalServiceClientManager,
asyncInternalServiceClientManager);
this.scheduler.start();
- PERFORMANCE_OVERVIEW_METRICS.recordScheduleCost(System.nanoTime() - startTime);
+
+ long endTime = System.nanoTime() - startTime;
+ PERFORMANCE_OVERVIEW_METRICS.recordScheduleCost(endTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.DISPATCHER, endTime);
}
// Use LogicalPlanner to do the logical query plan and logical optimization
@@ -334,7 +339,9 @@ public class QueryExecution implements IQueryExecution {
this.distributedPlan = planner.planFragments();
if (rawStatement.isQuery()) {
- QUERY_METRICS.recordPlanCost(DISTRIBUTION_PLANNER, System.nanoTime() - startTime);
+ long endTime = System.nanoTime() - startTime;
+ QUERY_METRICS.recordPlanCost(DISTRIBUTION_PLANNER, endTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.DISTRIBUTION_PLANNER, endTime);
}
if (isQuery() && logger.isDebugEnabled()) {
logger.debug(
@@ -445,7 +452,9 @@ public class QueryExecution implements IQueryExecution {
ListenableFuture<?> blocked = resultHandle.isBlocked();
blocked.get();
} finally {
- QUERY_METRICS.recordExecutionCost(WAIT_FOR_RESULT, System.nanoTime() - startTime);
+ long costTime = System.nanoTime() - startTime;
+ QUERY_METRICS.recordExecutionCost(WAIT_FOR_RESULT, costTime);
+ QUERY_STATISTICS.addCost(QueryStatistics.WAIT_FOR_RESULT, costTime);
}
if (!resultHandle.isFinished()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index d7b28b8cc8..565dc8b159 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -38,6 +39,9 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CHECK_MEMORY;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.NODE_TO_OPERATOR;
+
/**
* Used to plan a fragment instance. Currently, we simply change it from PlanNode to executable
* Operator tree, but in the future, we may split one fragment instance into multiple pipeline to
@@ -45,6 +49,8 @@ import java.util.List;
*/
public class LocalExecutionPlanner {
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
private static final Logger LOGGER = LoggerFactory.getLogger(LocalExecutionPlanner.class);
/** allocated memory for operator execution */
@@ -62,10 +68,16 @@ public class LocalExecutionPlanner {
// Generate pipelines, return the last pipeline data structure
// TODO Replace operator with operatorFactory to build multiple driver for one pipeline
+ long startTime = System.nanoTime();
Operator root = plan.accept(new OperatorTreeGenerator(), context);
+ long endTime = System.nanoTime();
+ QUERY_STATISTICS.addCost(NODE_TO_OPERATOR, endTime - startTime);
+ startTime = endTime;
// check whether current free memory is enough to execute current query
checkMemory(root, instanceContext.getStateMachine());
+ endTime = System.nanoTime();
+ QUERY_STATISTICS.addCost(CHECK_MEMORY, endTime - startTime);
context.addPipelineDriverFactory(root, context.getDriverContext());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index f55d695b2d..f3298af010 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.optimization.PlanOptimizer;
import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import java.util.List;
@@ -46,8 +47,9 @@ public class LogicalPlanner {
// optimize the query logical plan
if (analysis.getStatement().isQuery()) {
- QueryMetricsManager.getInstance()
- .recordPlanCost(LOGICAL_PLANNER, System.nanoTime() - startTime);
+ long endTime = System.nanoTime() - startTime;
+ QueryMetricsManager.getInstance().recordPlanCost(LOGICAL_PLANNER, endTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.LOGICAL_PLANNER, endTime);
for (PlanOptimizer optimizer : optimizers) {
rootNode = optimizer.optimize(rootNode, analysis, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index b990e0e4fb..60d2cf8146 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
@@ -122,7 +123,9 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
RpcUtils.getStatus(
TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
} finally {
- QUERY_METRICS.recordExecutionCost(DISPATCH_READ, System.nanoTime() - startTime);
+ long endTime = System.nanoTime() - startTime;
+ QUERY_METRICS.recordExecutionCost(DISPATCH_READ, endTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.DISPATCH_READ, endTime);
}
}
return immediateFuture(new FragInstanceDispatchResult(true));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
new file mode 100644
index 0000000000..6661c42ce6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.statistics;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.text.DecimalFormat;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+@ThreadSafe
+public class QueryStatistics {
+
+ private static final long QUERY_STATISTICS_PRINT_INTERVAL_IN_MS = 10_000;
+
+ private static final Logger QUERY_STATISTICS_LOGGER =
+ LoggerFactory.getLogger(IoTDBConstant.QUERY_STATISTICS_LOGGER_NAME);
+
+ private static final DecimalFormat format = new DecimalFormat("#,###");
+
+ private final AtomicBoolean tracing = new AtomicBoolean(true);
+
+ private final Map<String, OperationStatistic> operationStatistics = new ConcurrentHashMap<>();
+
+ public static final String LOCAL_EXECUTION_PLANNER = "LocalExecutionPlanner";
+
+ public static final String CREATE_FI_CONTEXT = "CreateFIContext";
+
+ public static final String CREATE_FI_EXEC = "CreateFIExec";
+
+ public static final String NODE_TO_OPERATOR = "ToOpTree";
+
+ public static final String CHECK_MEMORY = "CheckMem";
+
+ public static final String ALLOC_EX_MEMORY = "AllocExchangeMem";
+
+ public static final String QUERY_EXECUTION = "QueryExecution";
+
+ public static final String QUERY_RESOURCE_INIT = "QueryResourceInit";
+
+ public static final String INIT_SOURCE_OP = "InitSourceOp";
+
+ public static final String QUERY_RESOURCE_LIST = "TsFileList";
+ public static final String ADD_REFERENCE = "AddRef";
+
+ public static final String LOCAL_SOURCE_HANDLE_GET_TSBLOCK = "LocalSourceHandleGetTsBlock";
+
+ public static final String LOCAL_SOURCE_HANDLE_SER_TSBLOCK = "LocalSourceHandleSerializeTsBlock";
+
+ public static final String REMOTE_SOURCE_HANDLE_GET_TSBLOCK = "RemoteSourceHandleGetTsBlock";
+
+ public static final String REMOTE_SOURCE_HANDLE_DESER_TSBLOCK =
+ "RemoteSourceHandleDeserializeTsBlock";
+
+ public static final String WAIT_FOR_RESULT = "WaitForResult";
+
+ public static final String SERIES_SCAN_OPERATOR = "SeriesScanOperator";
+
+ public static final String ALIGNED_SERIES_SCAN_OPERATOR = "AlignedSeriesScanOperator";
+
+ public static final String AGG_SCAN_OPERATOR = "AbstractSeriesAggregationScanOperator";
+
+ public static final String CAL_NEXT_AGG_RES = "CalcNextAggRes";
+
+ public static final String CAL_AGG_FROM_RAW_DATA = "CalcAggFromRawData";
+
+ public static final String CAL_AGG_FROM_STAT = "CalcAggFromStat";
+
+ public static final String AGGREGATOR_PROCESS_TSBLOCK = "AggProcTsBlock";
+
+ public static final String CAL_AGG_FROM_PAGE = "CalcAggFromPage";
+
+ public static final String CAL_AGG_FROM_CHUNK = "CalcAggFromChunk";
+
+ public static final String CAL_AGG_FROM_FILE = "CalcAggFromFile";
+
+ public static final String BUILD_AGG_RES = "BuildAggRes";
+
+ public static final String FILTER_AND_PROJECT_OPERATOR = "FilterAndProjectOperator";
+
+ public static final String SINGLE_INPUT_AGG_OPERATOR = "SingleInputAggregationOperator";
+
+ public static final String PAGE_READER = "IPageReader";
+ public static final String PARSER = "Parser";
+
+ public static final String CREATE_QUERY_EXEC = "CreateQueryExec";
+
+ public static final String SERIALIZE_TSBLOCK = "SerTsBlock";
+
+ public static final String ANALYZER = "Analyzer";
+ public static final String SCHEMA_FETCHER = "SchemaFetcher";
+ public static final String PARTITION_FETCHER = "PartitionFetcher";
+ public static final String LOGICAL_PLANNER = "LogicalPlanner";
+ public static final String DISTRIBUTION_PLANNER = "DistributionPlanner";
+ public static final String DISPATCHER = "Dispatcher";
+
+ public static final String WAIT_FOR_DISPATCH = "WaitForDispatch";
+
+ public static final String DISPATCH_READ = "DispatchRead";
+
+ public static final String DRIVER_CLOSE = "CloseDriver";
+
+ public static final String DRIVER_INTERNAL_PROCESS = "DriverInternalProcess";
+
+ public static final String SEND_TSBLOCK = "SendTsBlock";
+
+ public static final String RESERVE_MEMORY = "ReserveMem";
+
+ public static final String NOTIFY_NEW_TSBLOCK = "NotifyNewTsBlock";
+
+ public static final String NOTIFY_END = "NotifyEnd";
+
+ public static final String FREE_MEM = "FreeMem";
+
+ public static final String SINK_HANDLE_END_LISTENER = "SinkHandleEndListener";
+
+ public static final String SINK_HANDLE_FINISH_LISTENER = "SinkHandleFinishListener";
+
+ public static final String CHECK_AND_INVOKE_ON_FINISHED = "CheckAndInvokeOnFinished";
+
+ public static final String SET_NO_MORE_TSBLOCK = "SetNoMoreTsBlock";
+
+ public static final String SERVER_RPC_RT = "ServerRpcRT";
+
+ public static final String LOAD_TIME_SERIES_METADATA_ALIGNED = "loadTimeSeriesMetadata-aligned";
+ public static final String LOAD_TIME_SERIES_METADATA = "loadTimeSeriesMetadata";
+ public static final String LOAD_CHUNK_METADATA_LIST = "loadChunkMetadataList";
+ public static final String LOAD_PAGE_READER_LIST = "loadPageReaderList";
+ public static final String TIME_SERIES_METADATA_CACHE_MISS = "TimeSeriesMetadataCacheMiss";
+ public static final String CHUNK_CACHE_MISS = "ChunkCacheMiss";
+
+ private QueryStatistics() {
+ ScheduledExecutorService scheduledExecutor =
+ IoTDBThreadPoolFactory.newScheduledThreadPool(1, "Query-Statistics-Print");
+ ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+ scheduledExecutor,
+ this::printQueryStatistics,
+ 0,
+ QUERY_STATISTICS_PRINT_INTERVAL_IN_MS,
+ TimeUnit.MILLISECONDS);
+ }
+
+ private void printQueryStatistics() {
+ if (tracing.get()) {
+
+ StringBuilder builder = new StringBuilder(System.lineSeparator());
+ builder
+ .append("Client Connection Thread:")
+ .append(System.lineSeparator())
+ .append(System.lineSeparator());
+
+ builder
+ .append("ServerRpcRT ")
+ .append(operationStatistics.get(SERVER_RPC_RT))
+ .append(System.lineSeparator());
+ builder
+ .append("|___CreateQueryExec ")
+ .append(operationStatistics.get(CREATE_QUERY_EXEC))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___Parser ")
+ .append(operationStatistics.get(PARSER))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___Analyzer ")
+ .append(operationStatistics.get(ANALYZER))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___PartitionFetcher ")
+ .append(operationStatistics.get(PARTITION_FETCHER))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___SchemaFetcher ")
+ .append(operationStatistics.get(SCHEMA_FETCHER))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___LogicalPlanner ")
+ .append(operationStatistics.get(LOGICAL_PLANNER))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___DistributionPlanner ")
+ .append(operationStatistics.get(DISTRIBUTION_PLANNER))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___Dispatcher ")
+ .append(operationStatistics.get(DISPATCHER))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___DispatchRead ")
+ .append(operationStatistics.get(DISPATCH_READ))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___LocalExecPlanner ")
+ .append(operationStatistics.get(LOCAL_EXECUTION_PLANNER))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___FIContext ")
+ .append(operationStatistics.get(CREATE_FI_CONTEXT))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___ToOpTree ")
+ .append(operationStatistics.get(NODE_TO_OPERATOR))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___CheckMem ")
+ .append(operationStatistics.get(CHECK_MEMORY))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___FIExec ")
+ .append(operationStatistics.get(CREATE_FI_EXEC))
+ .append(System.lineSeparator());
+ builder
+ .append("|___SerTsBlock ")
+ .append(operationStatistics.get(SERIALIZE_TSBLOCK))
+ .append(System.lineSeparator());
+ builder
+ .append(" |___WaitForResult ")
+ .append(operationStatistics.get(WAIT_FOR_RESULT))
+ .append(System.lineSeparator());
+ builder
+ .append(" |___GetTsBlock ")
+ .append(operationStatistics.get(LOCAL_SOURCE_HANDLE_GET_TSBLOCK))
+ .append(System.lineSeparator());
+ builder
+ .append(" |___FreeMem ")
+ .append(operationStatistics.get(FREE_MEM))
+ .append(System.lineSeparator());
+
+ builder
+ .append("Query Execution Thread:")
+ .append(System.lineSeparator())
+ .append(System.lineSeparator());
+
+ builder
+ .append("|___QueryResourceInit ")
+ .append(operationStatistics.get(QUERY_RESOURCE_INIT))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___TsFileList ")
+ .append(operationStatistics.get(QUERY_RESOURCE_LIST))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___AddRef ")
+ .append(operationStatistics.get(ADD_REFERENCE))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___InitSourceOp ")
+ .append(operationStatistics.get(INIT_SOURCE_OP))
+ .append(System.lineSeparator());
+ builder
+ .append("|___DriverInternalProcess ")
+ .append(operationStatistics.get(DRIVER_INTERNAL_PROCESS))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___AggScanOperator ")
+ .append(operationStatistics.get(AGG_SCAN_OPERATOR))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___AggFromStat ")
+ .append(operationStatistics.get(CAL_AGG_FROM_STAT))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___CalcNextAggRes ")
+ .append(operationStatistics.get(CAL_NEXT_AGG_RES))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___loadTSMeta ")
+ .append(operationStatistics.get(LOAD_TIME_SERIES_METADATA_ALIGNED))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___AggFromFile ")
+ .append(operationStatistics.get(CAL_AGG_FROM_FILE))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___loadChunkMeta ")
+ .append(operationStatistics.get(LOAD_CHUNK_METADATA_LIST))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___AggFromChunk ")
+ .append(operationStatistics.get(CAL_AGG_FROM_CHUNK))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___loadChunk ")
+ .append(operationStatistics.get(LOAD_PAGE_READER_LIST))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___AggFromPage ")
+ .append(operationStatistics.get(CAL_AGG_FROM_PAGE))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___loadPage ")
+ .append(operationStatistics.get(PAGE_READER))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___AggFromRawData ")
+ .append(operationStatistics.get(CAL_AGG_FROM_RAW_DATA))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___AggProcTsBlock ")
+ .append(operationStatistics.get(AGGREGATOR_PROCESS_TSBLOCK))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___BuildAggRes ")
+ .append(operationStatistics.get(BUILD_AGG_RES))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___SendTsBlock ")
+ .append(operationStatistics.get(SEND_TSBLOCK))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___ReserveMem ")
+ .append(operationStatistics.get(RESERVE_MEMORY))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___NotifyNewTsBlock ")
+ .append(operationStatistics.get(NOTIFY_NEW_TSBLOCK))
+ .append(System.lineSeparator());
+ builder
+ .append("|___SetNoMoreTsBlock ")
+ .append(operationStatistics.get(SET_NO_MORE_TSBLOCK))
+ .append(System.lineSeparator());
+ builder
+ .append(" |___NotifyEnd ")
+ .append(operationStatistics.get(NOTIFY_END))
+ .append(System.lineSeparator());
+ builder
+ .append(" |___EndListener ")
+ .append(operationStatistics.get(SINK_HANDLE_END_LISTENER))
+ .append(System.lineSeparator());
+ builder
+ .append(" |___CkAndInvOnFinished ")
+ .append(operationStatistics.get(CHECK_AND_INVOKE_ON_FINISHED))
+ .append(System.lineSeparator());
+ builder
+ .append(" |___FinishListener ")
+ .append(operationStatistics.get(SINK_HANDLE_FINISH_LISTENER))
+ .append(System.lineSeparator());
+
+ QUERY_STATISTICS_LOGGER.info(builder.toString());
+ QUERY_STATISTICS_LOGGER.info("");
+ }
+ }
+
+ public static QueryStatistics getInstance() {
+ return QueryStatisticsHolder.INSTANCE;
+ }
+
+ public void addCost(String key, long costTimeInNanos) {
+ if (tracing.get()) {
+ operationStatistics
+ .computeIfAbsent(key, k -> new OperationStatistic())
+ .addTimeCost(costTimeInNanos);
+ }
+ }
+
+ private static class OperationStatistic {
+ // accumulated operation time in ns
+ private final AtomicLong totalTime;
+ private final AtomicLong totalCount;
+
+ public OperationStatistic() {
+ this.totalTime = new AtomicLong(0);
+ this.totalCount = new AtomicLong(0);
+ }
+
+ public void addTimeCost(long costTimeInNanos) {
+ totalTime.addAndGet(costTimeInNanos);
+ totalCount.incrementAndGet();
+ }
+
+ @Override
+ public String toString() {
+ long time = totalTime.get() / 1_000;
+ long count = totalCount.get();
+ return "{"
+ + "totalTime="
+ + format.format(time)
+ + "us"
+ + ", totalCount="
+ + format.format(count)
+ + ", avgOpTime="
+ + format.format(time / count)
+ + "us"
+ + '}';
+ }
+ }
+
+ private static class QueryStatisticsHolder {
+
+ private static final QueryStatistics INSTANCE = new QueryStatistics();
+
+ private QueryStatisticsHolder() {}
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index e06d03ea76..7458e9d6d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemp
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.DropSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.SetSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
@@ -133,6 +134,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SERVER_RPC_RT;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
@@ -164,16 +166,22 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
private static final SelectResult SELECT_RESULT =
(resp, queryExecution, fetchSize) -> {
+ long startTime = System.nanoTime();
Pair<List<ByteBuffer>, Boolean> pair =
QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, fetchSize);
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.SERIALIZE_TSBLOCK, System.nanoTime() - startTime);
resp.setQueryResult(pair.left);
return pair.right;
};
private static final SelectResult OLD_SELECT_RESULT =
(resp, queryExecution, fetchSize) -> {
+ long startTime = System.nanoTime();
Pair<TSQueryDataSet, Boolean> pair =
QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize);
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.SERIALIZE_TSBLOCK, System.nanoTime() - startTime);
resp.setQueryDataSet(pair.left);
return pair.right;
};
@@ -194,6 +202,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
long startTime = System.currentTimeMillis();
+ long startTimeInNano = System.nanoTime();
StatementType statementType = null;
try {
Statement s = StatementGenerator.createStatement(statement, clientSession.getZoneId());
@@ -203,6 +212,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
RpcUtils.getStatus(
TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported"));
}
+
+ if (s.isQuery()) {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.PARSER, System.nanoTime() - startTimeInNano);
+ }
+
// permission check
TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -215,6 +230,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+ long start = System.nanoTime();
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
@@ -225,6 +241,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
partitionFetcher,
schemaFetcher,
req.getTimeout());
+ if (s.isQuery()) {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.CREATE_QUERY_EXEC, System.nanoTime() - start);
+ }
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
@@ -469,7 +489,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req) {
- return executeStatementV2(req);
+ long startTime = System.nanoTime();
+ try {
+ return executeStatementV2(req);
+ } finally {
+ QueryStatistics.getInstance().addCost(SERVER_RPC_RT, System.nanoTime() - startTime);
+ }
}
@Override
@@ -499,6 +524,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
+ long startTimeNanos = System.nanoTime();
long startTime = System.currentTimeMillis();
boolean finished = false;
StatementType statementType = null;
@@ -545,6 +571,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.cleanupQueryExecution(req.queryId);
}
SESSION_MANAGER.updateIdleTime();
+ QueryStatistics.getInstance().addCost(SERVER_RPC_RT, System.nanoTime() - startTimeNanos);
}
}
@@ -1000,7 +1027,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
- return executeStatement(req);
+ long startTime = System.nanoTime();
+ try {
+ return executeStatement(req);
+ } finally {
+ QueryStatistics.getInstance().addCost(SERVER_RPC_RT, System.nanoTime() - startTime);
+ }
}
@Override
@@ -1011,6 +1043,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
boolean finished = false;
+ long startTimeNanos = System.nanoTime();
long startTime = System.currentTimeMillis();
StatementType statementType = null;
try {
@@ -1056,6 +1089,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.cleanupQueryExecution(req.queryId);
}
SESSION_MANAGER.updateIdleTime();
+ QueryStatistics.getInstance().addCost(SERVER_RPC_RT, System.nanoTime() - startTimeNanos);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 1dc6500ac4..5387aafbf0 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.chunk.metadata.DiskAlignedChunkMetadataLoader;
import org.apache.iotdb.db.query.reader.chunk.metadata.DiskChunkMetadataLoader;
@@ -175,11 +176,12 @@ public class FileLoaderUtils {
}
return timeSeriesMetadata;
} finally {
+ long costTime = System.nanoTime() - t1;
QUERY_METRICS.recordSeriesScanCost(
loadFromMem
? LOAD_TIMESERIES_METADATA_NONALIGNED_MEM
: LOAD_TIMESERIES_METADATA_NONALIGNED_DISK,
- System.nanoTime() - t1);
+ costTime);
}
}
@@ -286,11 +288,14 @@ public class FileLoaderUtils {
}
return alignedTimeSeriesMetadata;
} finally {
+ long costTime = System.nanoTime() - t1;
QUERY_METRICS.recordSeriesScanCost(
loadFromMem
? LOAD_TIMESERIES_METADATA_ALIGNED_MEM
: LOAD_TIMESERIES_METADATA_ALIGNED_DISK,
- System.nanoTime() - t1);
+ costTime);
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.LOAD_TIME_SERIES_METADATA_ALIGNED, costTime);
}
}
@@ -301,7 +306,11 @@ public class FileLoaderUtils {
*/
public static List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeSeriesMetadata)
throws IOException {
- return timeSeriesMetadata.loadChunkMetadataList();
+ long startTime = System.nanoTime();
+ List<IChunkMetadata> chunkMetadataList = timeSeriesMetadata.loadChunkMetadataList();
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.LOAD_CHUNK_METADATA_LIST, System.nanoTime() - startTime);
+ return chunkMetadataList;
}
/**
@@ -312,11 +321,17 @@ public class FileLoaderUtils {
*/
public static List<IPageReader> loadPageReaderList(
IChunkMetadata chunkMetaData, Filter timeFilter) throws IOException {
- if (chunkMetaData == null) {
- throw new IOException("Can't init null chunkMeta");
+ long startTime = System.nanoTime();
+ try {
+ if (chunkMetaData == null) {
+ throw new IOException("Can't init null chunkMeta");
+ }
+ IChunkLoader chunkLoader = chunkMetaData.getChunkLoader();
+ IChunkReader chunkReader = chunkLoader.getChunkReader(chunkMetaData, timeFilter);
+ return chunkReader.loadPageReaderList();
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.LOAD_PAGE_READER_LIST, System.nanoTime() - startTime);
}
- IChunkLoader chunkLoader = chunkMetaData.getChunkLoader();
- IChunkReader chunkReader = chunkLoader.getChunkReader(chunkMetaData, timeFilter);
- return chunkReader.loadPageReaderList();
}
}