You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/05/14 14:06:31 UTC
[iotdb] branch lmh/queryPerformanceTest1.1 updated: cp from master
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/queryPerformanceTest1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/lmh/queryPerformanceTest1.1 by this push:
new 0167c4b0949 cp from master
0167c4b0949 is described below
commit 0167c4b0949c90e42b133b2a348d9a6f698e1678
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Sun May 14 22:06:24 2023 +0800
cp from master
---
.../apache/iotdb/commons/conf/IoTDBConstant.java | 1 +
.../apache/iotdb/db/engine/cache/ChunkCache.java | 5 +-
.../iotdb/db/mpp/aggregation/Aggregator.java | 56 +--
.../iotdb/db/mpp/execution/driver/DataDriver.java | 12 +-
.../iotdb/db/mpp/execution/driver/Driver.java | 18 +-
.../mpp/execution/exchange/SharedTsBlockQueue.java | 42 +-
.../execution/exchange/sink/LocalSinkChannel.java | 13 +
.../exchange/source/LocalSourceHandle.java | 15 +-
.../fragment/FragmentInstanceContext.java | 14 +
.../fragment/FragmentInstanceManager.java | 13 +-
.../db/mpp/execution/operator/OperatorContext.java | 7 +
.../operator/sink/IdentitySinkOperator.java | 4 +-
.../AbstractSeriesAggregationScanOperator.java | 320 ++++++++-----
.../operator/source/AlignedSeriesScanUtil.java | 122 ++---
.../execution/operator/source/SeriesScanUtil.java | 453 +++++++++++--------
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 12 +-
.../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 5 +-
.../db/mpp/plan/execution/QueryExecution.java | 15 +-
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 13 +-
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 6 +-
.../scheduler/FragmentInstanceDispatcherImpl.java | 5 +-
.../iotdb/db/mpp/statistics/QueryStatistics.java | 498 +++++++++++++++++++++
.../query/reader/chunk/DiskAlignedChunkLoader.java | 5 +-
.../db/query/reader/chunk/DiskChunkLoader.java | 5 +-
.../universal/AlignedPriorityMergeReader.java | 9 +-
.../reader/universal/PriorityMergeReader.java | 109 ++---
.../service/thrift/impl/ClientRPCServiceImpl.java | 38 +-
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 30 +-
28 files changed, 1344 insertions(+), 501 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 628fdbf8a23..a7775b03436 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -87,6 +87,7 @@ public class IoTDBConstant {
public static final String AUDIT_LOGGER_NAME = "IoTDB_AUDIT_LOGGER";
public static final String SLOW_SQL_LOGGER_NAME = "SLOW_SQL";
public static final String COMPACTION_LOGGER_NAME = "COMPACTION";
+ public static final String QUERY_STATISTICS_LOGGER_NAME = "QUERY_STATISTICS";
public static final String IOTDB_JMX_LOCAL = "iotdb.jmx.local";
public static final String IOTDB_JMX_PORT = "com.sun.management.jmxremote.port";
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
index 5564df780c2..0258522c099 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.metric.ChunkCacheMetrics;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -136,7 +137,9 @@ public class ChunkCache {
chunkMetaData.getDeleteIntervalList(),
chunkMetaData.getStatistics());
} finally {
- QUERY_METRICS.recordSeriesScanCost(READ_CHUNK_ALL, System.nanoTime() - startTime);
+ long costTime = System.nanoTime() - startTime;
+ QUERY_METRICS.recordSeriesScanCost(READ_CHUNK_ALL, costTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.LOAD_CHUNK, costTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
index e359ce884aa..3842d261a01 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -33,8 +33,7 @@ import java.util.Collections;
import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
-import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.AGGREGATOR_PROCESS_TSBLOCK;
public class Aggregator {
@@ -43,8 +42,6 @@ public class Aggregator {
protected List<InputLocation[]> inputLocationList;
protected final AggregationStep step;
- protected final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
-
// Used for SeriesAggregateScanOperator
public Aggregator(Accumulator accumulator, AggregationStep step) {
this.accumulator = accumulator;
@@ -78,34 +75,30 @@ public class Aggregator {
accumulator.addInput(timeAndValueColumn, bitMap, lastIndex);
}
} finally {
- QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
+ QueryStatistics.getInstance()
+ .addCost(AGGREGATOR_PROCESS_TSBLOCK, System.nanoTime() - startTime);
}
}
// Used for AggregateOperator
public void processTsBlocks(TsBlock[] tsBlock) {
- long startTime = System.nanoTime();
- try {
- checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input");
- if (step.isInputFinal()) {
- checkArgument(inputLocationList.size() == 1, "Final output can only be single column");
- Column finalResult =
- tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
- inputLocationList.get(0)[0].getValueColumnIndex());
- accumulator.setFinal(finalResult);
- } else {
- for (InputLocation[] inputLocations : inputLocationList) {
- Column[] columns = new Column[inputLocations.length];
- for (int i = 0; i < inputLocations.length; i++) {
- columns[i] =
- tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
- inputLocations[i].getValueColumnIndex());
- }
- accumulator.addIntermediate(columns);
+ checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input");
+ if (step.isInputFinal()) {
+ checkArgument(inputLocationList.size() == 1, "Final output can only be single column");
+ Column finalResult =
+ tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
+ inputLocationList.get(0)[0].getValueColumnIndex());
+ accumulator.setFinal(finalResult);
+ } else {
+ for (InputLocation[] inputLocations : inputLocationList) {
+ Column[] columns = new Column[inputLocations.length];
+ for (int i = 0; i < inputLocations.length; i++) {
+ columns[i] =
+ tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
+ inputLocations[i].getValueColumnIndex());
}
+ accumulator.addIntermediate(columns);
}
- } finally {
- QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
}
}
@@ -119,14 +112,9 @@ public class Aggregator {
/** Used for SeriesAggregateScanOperator. */
public void processStatistics(Statistics[] statistics) {
- long startTime = System.nanoTime();
- try {
- for (InputLocation[] inputLocations : inputLocationList) {
- int valueIndex = inputLocations[0].getValueColumnIndex();
- accumulator.addStatistics(statistics[valueIndex]);
- }
- } finally {
- QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_STATISTICS, System.nanoTime() - startTime);
+ for (InputLocation[] inputLocations : inputLocationList) {
+ int valueIndex = inputLocations[0].getValueColumnIndex();
+ accumulator.addStatistics(statistics[valueIndex]);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index d7378a67b00..e5b106258fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import com.google.common.util.concurrent.SettableFuture;
@@ -30,6 +31,7 @@ import javax.annotation.concurrent.NotThreadSafe;
import java.util.List;
import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.QUERY_RESOURCE_INIT;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.INIT_SOURCE_OP;
/**
* One dataDriver is responsible for one FragmentInstance which is for data query, which may
@@ -77,6 +79,7 @@ public class DataDriver extends Driver {
// And it's safe for us to throw this exception here in such case.
throw new IllegalStateException("QueryDataSource should never be null!");
}
+ long start = System.nanoTime();
sourceOperators.forEach(
sourceOperator -> {
// construct QueryDataSource for source operator
@@ -87,11 +90,18 @@ public class DataDriver extends Driver {
sourceOperator.initQueryDataSource(queryDataSource);
});
+ driverContext
+ .getFragmentInstanceContext()
+ .addOperationTime(INIT_SOURCE_OP, System.nanoTime() - start);
}
this.init = true;
} finally {
- QUERY_METRICS.recordExecutionCost(QUERY_RESOURCE_INIT, System.nanoTime() - startTime);
+ long costTime = System.nanoTime() - startTime;
+ QUERY_METRICS.recordExecutionCost(QUERY_RESOURCE_INIT, costTime);
+ driverContext
+ .getFragmentInstanceContext()
+ .addOperationTime(QueryStatistics.QUERY_RESOURCE_INIT, costTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index f92650bcde3..045bcdfcbf0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.collect.ImmutableList;
@@ -48,6 +49,8 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.lang.Boolean.TRUE;
import static org.apache.iotdb.db.mpp.execution.operator.Operator.NOT_BLOCKED;
import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.DRIVER_INTERNAL_PROCESS;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SEND_TSBLOCK;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SET_NO_MORE_TSBLOCK;
public abstract class Driver implements IDriver {
@@ -220,7 +223,11 @@ public abstract class Driver implements IDriver {
if (root.hasNextWithTimer()) {
TsBlock tsBlock = root.nextWithTimer();
if (tsBlock != null && !tsBlock.isEmpty()) {
+ long startTime = System.nanoTime();
sink.send(tsBlock);
+ driverContext
+ .getFragmentInstanceContext()
+ .addOperationTime(SEND_TSBLOCK, System.nanoTime() - startTime);
}
}
return NOT_BLOCKED;
@@ -241,8 +248,11 @@ public abstract class Driver implements IDriver {
driverContext.failed(newException);
throw newException;
} finally {
- QUERY_METRICS.recordExecutionCost(
- DRIVER_INTERNAL_PROCESS, System.nanoTime() - startTimeNanos);
+ long costTime = System.nanoTime() - startTimeNanos;
+ QUERY_METRICS.recordExecutionCost(DRIVER_INTERNAL_PROCESS, costTime);
+ driverContext
+ .getFragmentInstanceContext()
+ .addOperationTime(QueryStatistics.DRIVER_INTERNAL_PROCESS, costTime);
}
}
@@ -371,8 +381,12 @@ public abstract class Driver implements IDriver {
Throwable inFlightException = null;
try {
+ long startTime = System.nanoTime();
root.close();
sink.setNoMoreTsBlocks();
+ driverContext
+ .getFragmentInstanceContext()
+ .addOperationTime(SET_NO_MORE_TSBLOCK, System.nanoTime() - startTime);
// record operator execution statistics to metrics
List<OperatorContext> operatorContexts = driverContext.getOperatorContexts();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index f5f1884f690..1105d5ef68f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.sink.LocalSinkChannel;
import org.apache.iotdb.db.mpp.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -41,11 +42,17 @@ import java.util.Queue;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.FREE_MEM;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.NOTIFY_END;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.NOTIFY_NEW_TSBLOCK;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.RESERVE_MEMORY;
/** This is not thread safe class, the caller should ensure multi-threads safety. */
@NotThreadSafe
public class SharedTsBlockQueue {
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
private static final Logger LOGGER = LoggerFactory.getLogger(SharedTsBlockQueue.class);
private final TFragmentInstanceId localFragmentInstanceId;
@@ -156,7 +163,9 @@ public class SharedTsBlockQueue {
}
this.noMoreTsBlocks = noMoreTsBlocks;
if (!blocked.isDone()) {
+ long startTime = System.nanoTime();
blocked.set(null);
+ QUERY_STATISTICS.addCost(NOTIFY_END, System.nanoTime() - startTime);
}
if (this.sourceHandle != null) {
this.sourceHandle.checkAndInvokeOnFinished();
@@ -172,6 +181,7 @@ public class SharedTsBlockQueue {
throw new IllegalStateException("queue has been destroyed");
}
TsBlock tsBlock = queue.remove();
+ long startTime = System.nanoTime();
localMemoryManager
.getQueryPool()
.free(
@@ -179,6 +189,7 @@ public class SharedTsBlockQueue {
fullFragmentInstanceId,
localPlanNodeId,
tsBlock.getRetainedSizeInBytes());
+ QUERY_STATISTICS.addCost(FREE_MEM, System.nanoTime() - startTime);
bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
// Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to
// corresponding LocalSinkChannel.
@@ -205,17 +216,24 @@ public class SharedTsBlockQueue {
Validate.isTrue(
blockedOnMemory == null || blockedOnMemory.isDone(), "SharedTsBlockQueue is full");
- Pair<ListenableFuture<Void>, Boolean> pair =
- localMemoryManager
- .getQueryPool()
- .reserve(
- localFragmentInstanceId.getQueryId(),
- fullFragmentInstanceId,
- localPlanNodeId,
- tsBlock.getRetainedSizeInBytes(),
- maxBytesCanReserve);
- blockedOnMemory = pair.left;
- bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
+
+ long startTime = System.nanoTime();
+ Pair<ListenableFuture<Void>, Boolean> pair;
+ try {
+ pair =
+ localMemoryManager
+ .getQueryPool()
+ .reserve(
+ localFragmentInstanceId.getQueryId(),
+ fullFragmentInstanceId,
+ localPlanNodeId,
+ tsBlock.getRetainedSizeInBytes(),
+ maxBytesCanReserve);
+ blockedOnMemory = pair.left;
+ bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
+ } finally {
+ QUERY_STATISTICS.addCost(RESERVE_MEMORY, System.nanoTime() - startTime);
+ }
// reserve memory failed, we should wait until there is enough memory
if (!pair.right) {
@@ -232,7 +250,9 @@ public class SharedTsBlockQueue {
} else { // reserve memory succeeded, add the TsBlock directly
queue.add(tsBlock);
if (!blocked.isDone()) {
+ startTime = System.nanoTime();
blocked.set(null);
+ QUERY_STATISTICS.addCost(NOTIFY_NEW_TSBLOCK, System.nanoTime() - startTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
index aae1f8f37c7..a2098376be4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.execution.exchange.sink;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkListener;
import org.apache.iotdb.db.mpp.execution.exchange.SharedTsBlockQueue;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -35,9 +36,14 @@ import java.util.Optional;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_LOCAL;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CHECK_AND_INVOKE_ON_FINISHED;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SINK_HANDLE_END_LISTENER;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SINK_HANDLE_FINISH_LISTENER;
public class LocalSinkChannel implements ISinkChannel {
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
private static final Logger LOGGER = LoggerFactory.getLogger(LocalSinkChannel.class);
private TFragmentInstanceId localFragmentInstanceId;
@@ -109,8 +115,10 @@ public class LocalSinkChannel implements ISinkChannel {
if (isFinished()) {
synchronized (this) {
if (!invokedOnFinished) {
+ long start = System.nanoTime();
sinkListener.onFinish(this);
invokedOnFinished = true;
+ QUERY_STATISTICS.addCost(SINK_HANDLE_FINISH_LISTENER, System.nanoTime() - start);
}
}
}
@@ -156,10 +164,15 @@ public class LocalSinkChannel implements ISinkChannel {
return;
}
queue.setNoMoreTsBlocks(true);
+ long startTime = System.nanoTime();
sinkListener.onEndOfBlocks(this);
+ QUERY_STATISTICS.addCost(SINK_HANDLE_END_LISTENER, System.nanoTime() - startTime);
}
}
+
+ long startTime = System.nanoTime();
checkAndInvokeOnFinished();
+ QUERY_STATISTICS.addCost(CHECK_AND_INVOKE_ON_FINISHED, System.nanoTime() - startTime);
LOGGER.debug("[EndSetNoMoreTsBlocksOnLocal]");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
index 9ffe6e09484..8c9015eb92d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.mpp.execution.exchange.SharedTsBlockQueue;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -40,11 +41,15 @@ import static com.google.common.util.concurrent.Futures.nonCancellationPropagati
import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SOURCE_HANDLE_DESERIALIZE_TSBLOCK_LOCAL;
import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SOURCE_HANDLE_GET_TSBLOCK_LOCAL;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.LOCAL_SOURCE_HANDLE_GET_TSBLOCK;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.LOCAL_SOURCE_HANDLE_SER_TSBLOCK;
public class LocalSourceHandle implements ISourceHandle {
private static final Logger LOGGER = LoggerFactory.getLogger(LocalSourceHandle.class);
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
private TFragmentInstanceId localFragmentInstanceId;
private String localPlanNodeId;
private final SourceHandleListener sourceHandleListener;
@@ -121,8 +126,9 @@ public class LocalSourceHandle implements ISourceHandle {
checkAndInvokeOnFinished();
return tsBlock;
} finally {
- QUERY_METRICS.recordDataExchangeCost(
- SOURCE_HANDLE_GET_TSBLOCK_LOCAL, System.nanoTime() - startTime);
+ long costTime = System.nanoTime() - startTime;
+ QUERY_METRICS.recordDataExchangeCost(SOURCE_HANDLE_GET_TSBLOCK_LOCAL, costTime);
+ QUERY_STATISTICS.addCost(LOCAL_SOURCE_HANDLE_GET_TSBLOCK, costTime);
}
}
@@ -136,8 +142,9 @@ public class LocalSourceHandle implements ISourceHandle {
} catch (Exception e) {
throw new IoTDBException(e, TSStatusCode.TSBLOCK_SERIALIZE_ERROR.getStatusCode());
} finally {
- QUERY_METRICS.recordDataExchangeCost(
- SOURCE_HANDLE_DESERIALIZE_TSBLOCK_LOCAL, System.nanoTime() - startTime);
+ long costTime = System.nanoTime() - startTime;
+ QUERY_METRICS.recordDataExchangeCost(SOURCE_HANDLE_DESERIALIZE_TSBLOCK_LOCAL, costTime);
+ QUERY_STATISTICS.addCost(LOCAL_SOURCE_HANDLE_SER_TSBLOCK, costTime);
}
} else {
return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 7e6257ef201..b7fd82d964f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -45,6 +46,9 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.ADD_REFERENCE;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.QUERY_RESOURCE_LIST;
+
public class FragmentInstanceContext extends QueryContext {
private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceContext.class);
@@ -83,6 +87,8 @@ public class FragmentInstanceContext extends QueryContext {
// private final AtomicLong endFullGcCount = new AtomicLong(-1);
// private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
public static FragmentInstanceContext createFragmentInstanceContext(
FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
FragmentInstanceContext instanceContext =
@@ -279,6 +285,7 @@ public class FragmentInstanceContext extends QueryContext {
selectedDeviceIdSet.add(translatedPath.getDevice());
}
+ long startTime = System.nanoTime();
this.sharedQueryDataSource =
dataRegion.query(
pathList,
@@ -287,13 +294,16 @@ public class FragmentInstanceContext extends QueryContext {
selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
this,
timeFilter != null ? timeFilter.copy() : null);
+ addOperationTime(QUERY_RESOURCE_LIST, System.nanoTime() - startTime);
// used files should be added before mergeLock is unlocked, or they may be deleted by
// running merge
if (sharedQueryDataSource != null) {
closedFilePaths = new HashSet<>();
unClosedFilePaths = new HashSet<>();
+ startTime = System.nanoTime();
addUsedFilesForQuery(sharedQueryDataSource);
+ addOperationTime(ADD_REFERENCE, System.nanoTime() - startTime);
}
} finally {
dataRegion.readUnlock();
@@ -375,4 +385,8 @@ public class FragmentInstanceContext extends QueryContext {
sourcePaths = null;
sharedQueryDataSource = null;
}
+
+ public void addOperationTime(String key, long costTimeInNanos) {
+ QUERY_STATISTICS.addCost(key, costTimeInNanos);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index d563f9cf24d..ae0bb57fa44 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.mpp.plan.planner.PipelineDriverFactory;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.utils.SetThreadName;
import io.airlift.stats.CounterStat;
@@ -52,6 +53,8 @@ import static java.util.Objects.requireNonNull;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceExecution.createFragmentInstanceExecution;
import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.LOCAL_EXECUTION_PLANNER;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CREATE_FI_CONTEXT;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CREATE_FI_EXEC;
public class FragmentInstanceManager {
@@ -76,6 +79,7 @@ public class FragmentInstanceManager {
private final ExecutorService intoOperationExecutor;
private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
public static FragmentInstanceManager getInstance() {
return FragmentInstanceManager.InstanceHolder.INSTANCE;
@@ -118,6 +122,7 @@ public class FragmentInstanceManager {
FragmentInstanceStateMachine stateMachine =
new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ long start = System.nanoTime();
FragmentInstanceContext context =
instanceContext.computeIfAbsent(
instanceId,
@@ -128,6 +133,7 @@ public class FragmentInstanceManager {
instance.getSessionInfo(),
dataRegion,
instance.getTimeFilter()));
+ QUERY_STATISTICS.addCost(CREATE_FI_CONTEXT, System.nanoTime() - start);
try {
List<PipelineDriverFactory> driverFactories =
@@ -141,6 +147,7 @@ public class FragmentInstanceManager {
// get the sink of last driver
ISink sink = drivers.get(drivers.size() - 1).getSink();
+ start = System.nanoTime();
return createFragmentInstanceExecution(
scheduler,
instanceId,
@@ -154,6 +161,8 @@ public class FragmentInstanceManager {
logger.warn("error when create FragmentInstanceExecution.", t);
stateMachine.failed(t);
return null;
+ } finally {
+ QUERY_STATISTICS.addCost(CREATE_FI_EXEC, System.nanoTime() - start);
}
});
@@ -171,7 +180,9 @@ public class FragmentInstanceManager {
return createFailedInstanceInfo(instanceId);
}
} finally {
- QUERY_METRICS.recordExecutionCost(LOCAL_EXECUTION_PLANNER, System.nanoTime() - startTime);
+ long endTime = System.nanoTime() - startTime;
+ QUERY_METRICS.recordExecutionCost(LOCAL_EXECUTION_PLANNER, endTime);
+ QUERY_STATISTICS.addCost(QueryStatistics.LOCAL_EXECUTION_PLANNER, endTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
index c5c0db783d5..7a55772dcea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import io.airlift.units.Duration;
@@ -45,6 +46,8 @@ public class OperatorContext {
private long totalExecutionTimeInNanos = 0L;
private long nextCalledCount = 0L;
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
public OperatorContext(
int operatorId, PlanNodeId planNodeId, String operatorType, DriverContext driverContext) {
this.operatorId = operatorId;
@@ -65,6 +68,10 @@ public class OperatorContext {
this.driverContext = new DriverContext(fragmentInstanceContext, 0);
}
+ public void addOperatorTime(String key, long costTimeInNanos) {
+ QUERY_STATISTICS.addCost(key, costTimeInNanos);
+ }
+
public int getOperatorId() {
return operatorId;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java
index 80ac16af92b..ec10b224979 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java
@@ -57,7 +57,7 @@ public class IdentitySinkOperator implements Operator {
public boolean hasNext() throws Exception {
int currentIndex = downStreamChannelIndex.getCurrentIndex();
boolean currentChannelClosed = sinkHandle.isChannelClosed(currentIndex);
- if (!currentChannelClosed && children.get(currentIndex).hasNext()) {
+ if (!currentChannelClosed && children.get(currentIndex).hasNextWithTimer()) {
return true;
} else if (currentChannelClosed) {
// we close the child directly. The child could be an ExchangeOperator which is the downstream
@@ -96,7 +96,7 @@ public class IdentitySinkOperator implements Operator {
needToReturnNull = false;
return null;
}
- return children.get(downStreamChannelIndex.getCurrentIndex()).next();
+ return children.get(downStreamChannelIndex.getCurrentIndex()).nextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 67b42a4319f..e2008692716 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -22,8 +22,10 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -41,6 +43,17 @@ import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.AGG_SCAN_OPERATOR;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.BUILD_AGG_RES;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_CHUNK;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_CHUNK_STAT;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_FILE;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_FILE_STAT;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_PAGE;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_PAGE_STAT;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_NEXT_AGG_RES;
public abstract class AbstractSeriesAggregationScanOperator extends AbstractDataSourceOperator {
@@ -122,28 +135,33 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData
// start stopwatch
long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
long start = System.nanoTime();
+ try {
+ while (System.nanoTime() - start < maxRuntime
+ && timeRangeIterator.hasNextTimeRange()
+ && !resultTsBlockBuilder.isFull()) {
+ // move to next time window
+ curTimeRange = timeRangeIterator.nextTimeRange();
+
+ // clear previous aggregation result
+ for (Aggregator aggregator : aggregators) {
+ aggregator.reset();
+ }
- while (System.nanoTime() - start < maxRuntime
- && timeRangeIterator.hasNextTimeRange()
- && !resultTsBlockBuilder.isFull()) {
- // move to next time window
- curTimeRange = timeRangeIterator.nextTimeRange();
-
- // clear previous aggregation result
- for (Aggregator aggregator : aggregators) {
- aggregator.reset();
+ long startTime = System.nanoTime();
+ // calculate aggregation result on current time window
+ calculateNextAggregationResult();
+ operatorContext.addOperatorTime(CAL_NEXT_AGG_RES, System.nanoTime() - startTime);
}
- // calculate aggregation result on current time window
- calculateNextAggregationResult();
- }
-
- if (resultTsBlockBuilder.getPositionCount() > 0) {
- TsBlock resultTsBlock = resultTsBlockBuilder.build();
- resultTsBlockBuilder.reset();
- return resultTsBlock;
- } else {
- return null;
+ if (resultTsBlockBuilder.getPositionCount() > 0) {
+ TsBlock resultTsBlock = resultTsBlockBuilder.build();
+ resultTsBlockBuilder.reset();
+ return resultTsBlock;
+ } else {
+ return null;
+ }
+ } finally {
+ operatorContext.addOperatorTime(AGG_SCAN_OPERATOR, System.nanoTime() - start);
}
}
@@ -184,8 +202,10 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData
}
protected void updateResultTsBlock() {
+ long startTime = System.nanoTime();
appendAggregationResult(
resultTsBlockBuilder, aggregators, timeRangeIterator.currentOutputTime());
+ operatorContext.addOperatorTime(BUILD_AGG_RES, System.nanoTime() - startTime);
}
protected boolean calcFromCachedData() {
@@ -193,139 +213,205 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData
}
private boolean calcFromRawData(TsBlock tsBlock) {
- Pair<Boolean, TsBlock> calcResult =
- calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, ascending);
- inputTsBlock = calcResult.getRight();
- return calcResult.getLeft();
+ long startTime = System.nanoTime();
+ try {
+ Pair<Boolean, TsBlock> calcResult =
+ calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, ascending);
+ inputTsBlock = calcResult.getRight();
+ return calcResult.getLeft();
+ } finally {
+ operatorContext.addOperatorTime(
+ QueryStatistics.CAL_AGG_FROM_RAW_DATA, System.nanoTime() - startTime);
+ QueryMetricsManager.getInstance()
+ .recordExecutionCost(AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
+ }
}
protected void calcFromStatistics(Statistics[] statistics) {
- for (Aggregator aggregator : aggregators) {
- if (aggregator.hasFinalResult()) {
- continue;
+ long startTime = System.nanoTime();
+ try {
+ for (Aggregator aggregator : aggregators) {
+ if (aggregator.hasFinalResult()) {
+ continue;
+ }
+ aggregator.processStatistics(statistics);
}
- aggregator.processStatistics(statistics);
+ } finally {
+ QueryMetricsManager.getInstance()
+ .recordExecutionCost(AGGREGATION_FROM_STATISTICS, System.nanoTime() - startTime);
+ operatorContext.addOperatorTime(
+ QueryStatistics.CAL_AGG_FROM_STAT, System.nanoTime() - startTime);
}
}
protected boolean readAndCalcFromFile() throws IOException {
- while (seriesScanUtil.hasNextFile()) {
- if (canUseCurrentFileStatistics()) {
- Statistics fileTimeStatistics = seriesScanUtil.currentFileTimeStatistics();
- if (fileTimeStatistics.getStartTime() > curTimeRange.getMax()) {
- if (ascending) {
- return true;
- } else {
- seriesScanUtil.skipCurrentFile();
- continue;
+ long startTime = System.nanoTime();
+ long chunkCostTime = 0L;
+ try {
+ while (seriesScanUtil.hasNextFile()) {
+ long start = System.nanoTime();
+ try {
+ if (canUseCurrentFileStatistics()) {
+ Statistics fileTimeStatistics = seriesScanUtil.currentFileTimeStatistics();
+ if (fileTimeStatistics.getStartTime() > curTimeRange.getMax()) {
+ if (ascending) {
+ return true;
+ } else {
+ seriesScanUtil.skipCurrentFile();
+ continue;
+ }
+ }
+ // calc from fileMetaData
+ if (curTimeRange.contains(
+ fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) {
+ Statistics[] statisticsList = new Statistics[subSensorSize];
+ for (int i = 0; i < subSensorSize; i++) {
+ statisticsList[i] = seriesScanUtil.currentFileStatistics(i);
+ }
+ calcFromStatistics(statisticsList);
+ seriesScanUtil.skipCurrentFile();
+ if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
+ return true;
+ } else {
+ continue;
+ }
+ }
}
+ } finally {
+ operatorContext.addOperatorTime(CAL_AGG_FROM_FILE_STAT, System.nanoTime() - start);
}
- // calc from fileMetaData
- if (curTimeRange.contains(
- fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) {
- Statistics[] statisticsList = new Statistics[subSensorSize];
- for (int i = 0; i < subSensorSize; i++) {
- statisticsList[i] = seriesScanUtil.currentFileStatistics(i);
- }
- calcFromStatistics(statisticsList);
- seriesScanUtil.skipCurrentFile();
- if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
+
+ // read chunk
+ start = System.nanoTime();
+ try {
+ if (readAndCalcFromChunk()) {
return true;
- } else {
- continue;
}
+ } finally {
+ chunkCostTime += System.nanoTime() - start;
}
}
-
- // read chunk
- if (readAndCalcFromChunk()) {
- return true;
- }
+ return false;
+ } finally {
+ operatorContext.addOperatorTime(
+ CAL_AGG_FROM_FILE, System.nanoTime() - startTime - chunkCostTime);
}
-
- return false;
}
protected boolean readAndCalcFromChunk() throws IOException {
- while (seriesScanUtil.hasNextChunk()) {
- if (canUseCurrentChunkStatistics()) {
- Statistics chunkTimeStatistics = seriesScanUtil.currentChunkTimeStatistics();
- if (chunkTimeStatistics.getStartTime() > curTimeRange.getMax()) {
- if (ascending) {
- return true;
- } else {
- seriesScanUtil.skipCurrentChunk();
- continue;
+ long startTime = System.nanoTime();
+ long pageCostTime = 0L;
+ try {
+ while (seriesScanUtil.hasNextChunk()) {
+ long start = System.nanoTime();
+ try {
+ if (canUseCurrentChunkStatistics()) {
+ Statistics chunkTimeStatistics = seriesScanUtil.currentChunkTimeStatistics();
+ if (chunkTimeStatistics.getStartTime() > curTimeRange.getMax()) {
+ if (ascending) {
+ return true;
+ } else {
+ seriesScanUtil.skipCurrentChunk();
+ continue;
+ }
+ }
+ // calc from chunkMetaData
+ if (curTimeRange.contains(
+ chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) {
+ // calc from chunkMetaData
+ Statistics[] statisticsList = new Statistics[subSensorSize];
+ for (int i = 0; i < subSensorSize; i++) {
+ statisticsList[i] = seriesScanUtil.currentChunkStatistics(i);
+ }
+ calcFromStatistics(statisticsList);
+ seriesScanUtil.skipCurrentChunk();
+ if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
+ return true;
+ } else {
+ continue;
+ }
+ }
}
+ } finally {
+ operatorContext.addOperatorTime(CAL_AGG_FROM_CHUNK_STAT, System.nanoTime() - start);
}
- // calc from chunkMetaData
- if (curTimeRange.contains(
- chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) {
- // calc from chunkMetaData
- Statistics[] statisticsList = new Statistics[subSensorSize];
- for (int i = 0; i < subSensorSize; i++) {
- statisticsList[i] = seriesScanUtil.currentChunkStatistics(i);
- }
- calcFromStatistics(statisticsList);
- seriesScanUtil.skipCurrentChunk();
- if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
+
+ // read page
+ start = System.nanoTime();
+ try {
+ if (readAndCalcFromPage()) {
return true;
- } else {
- continue;
}
+ } finally {
+ pageCostTime += System.nanoTime() - start;
}
}
-
- // read page
- if (readAndCalcFromPage()) {
- return true;
- }
+ return false;
+ } finally {
+ operatorContext.addOperatorTime(
+ CAL_AGG_FROM_CHUNK, System.nanoTime() - startTime - pageCostTime);
}
- return false;
}
protected boolean readAndCalcFromPage() throws IOException {
- while (seriesScanUtil.hasNextPage()) {
- if (canUseCurrentPageStatistics()) {
- Statistics pageTimeStatistics = seriesScanUtil.currentPageTimeStatistics();
- // There is no more eligible points in current time range
- if (pageTimeStatistics.getStartTime() > curTimeRange.getMax()) {
- if (ascending) {
- return true;
- } else {
- seriesScanUtil.skipCurrentPage();
- continue;
+ long startTime = System.nanoTime();
+ long rawDataCostTime = 0L;
+ try {
+ while (seriesScanUtil.hasNextPage()) {
+ long start = System.nanoTime();
+ try {
+ if (canUseCurrentPageStatistics()) {
+ Statistics pageTimeStatistics = seriesScanUtil.currentPageTimeStatistics();
+ // There is no more eligible points in current time range
+ if (pageTimeStatistics.getStartTime() > curTimeRange.getMax()) {
+ if (ascending) {
+ return true;
+ } else {
+ seriesScanUtil.skipCurrentPage();
+ continue;
+ }
+ }
+ // can use pageHeader
+ if (curTimeRange.contains(
+ pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) {
+ Statistics[] statisticsList = new Statistics[subSensorSize];
+ for (int i = 0; i < subSensorSize; i++) {
+ statisticsList[i] = seriesScanUtil.currentPageStatistics(i);
+ }
+ calcFromStatistics(statisticsList);
+ seriesScanUtil.skipCurrentPage();
+ if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
+ return true;
+ } else {
+ continue;
+ }
+ }
}
+ } finally {
+ operatorContext.addOperatorTime(CAL_AGG_FROM_PAGE_STAT, System.nanoTime() - start);
}
- // can use pageHeader
- if (curTimeRange.contains(
- pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) {
- Statistics[] statisticsList = new Statistics[subSensorSize];
- for (int i = 0; i < subSensorSize; i++) {
- statisticsList[i] = seriesScanUtil.currentPageStatistics(i);
- }
- calcFromStatistics(statisticsList);
- seriesScanUtil.skipCurrentPage();
- if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
+
+ // calc from page data
+ TsBlock tsBlock = seriesScanUtil.nextPage();
+ if (tsBlock == null || tsBlock.isEmpty()) {
+ continue;
+ }
+
+ // calc from raw data
+ start = System.nanoTime();
+ try {
+ if (calcFromRawData(tsBlock)) {
return true;
- } else {
- continue;
}
+ } finally {
+ rawDataCostTime += System.nanoTime() - start;
}
}
-
- // calc from page data
- TsBlock tsBlock = seriesScanUtil.nextPage();
- if (tsBlock == null || tsBlock.isEmpty()) {
- continue;
- }
-
- // calc from raw data
- if (calcFromRawData(tsBlock)) {
- return true;
- }
+ return false;
+ } finally {
+ operatorContext.addOperatorTime(
+ CAL_AGG_FROM_PAGE, System.nanoTime() - startTime - rawDataCostTime);
}
- return false;
}
protected boolean canUseCurrentFileStatistics() throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
index ebfe5f87b11..1e1cdd09679 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.universal.AlignedDescPriorityMergeReader;
import org.apache.iotdb.db.query.reader.universal.AlignedPriorityMergeReader;
@@ -130,68 +131,87 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
@Override
protected void filterFirstTimeSeriesMetadata() throws IOException {
- if (firstTimeSeriesMetadata != null
- && !isFileOverlapped()
- && !firstTimeSeriesMetadata.isModified()) {
- Filter queryFilter = scanOptions.getQueryFilter();
- if (queryFilter != null) {
- // TODO accept valueStatisticsList to filter
- if (!queryFilter.satisfy(firstTimeSeriesMetadata.getStatistics())) {
- skipCurrentFile();
- }
- } else {
- // For aligned series, When we only query some measurements under an aligned device, if the
- // values of these queried measurements at a timestamp are all null, the timestamp will not
- // be selected.
- // NOTE: if we change the query semantic in the future for aligned series, we need to remove
- // this check here.
- long rowCount =
- ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getTimeStatistics().getCount();
- for (Statistics statistics :
- ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getValueStatisticsList()) {
- if (statistics == null || statistics.hasNullValue(rowCount)) {
- return;
+ long startTime = System.nanoTime();
+ try {
+ if (firstTimeSeriesMetadata != null
+ && !isFileOverlapped()
+ && !firstTimeSeriesMetadata.isModified()) {
+ Filter queryFilter = scanOptions.getQueryFilter();
+ if (queryFilter != null) {
+ // TODO accept valueStatisticsList to filter
+ if (!queryFilter.satisfy(firstTimeSeriesMetadata.getStatistics())) {
+ skipCurrentFile();
+ }
+ } else {
+ // For aligned series, When we only query some measurements under an aligned device, if
+ // the
+ // values of these queried measurements at a timestamp are all null, the timestamp will
+ // not
+ // be selected.
+ // NOTE: if we change the query semantic in the future for aligned series, we need to
+ // remove
+ // this check here.
+ long rowCount =
+ ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getTimeStatistics().getCount();
+ for (Statistics statistics :
+ ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getValueStatisticsList()) {
+ if (statistics == null || statistics.hasNullValue(rowCount)) {
+ return;
+ }
+ }
+ // When the number of points in all value chunk groups is the same as that in the time
+ // chunk
+ // group, it means that there is no null value, and all timestamps will be selected.
+ if (paginationController.hasCurOffset(rowCount)) {
+ skipCurrentFile();
+ paginationController.consumeOffset(rowCount);
}
- }
- // When the number of points in all value chunk groups is the same as that in the time chunk
- // group, it means that there is no null value, and all timestamps will be selected.
- if (paginationController.hasCurOffset(rowCount)) {
- skipCurrentFile();
- paginationController.consumeOffset(rowCount);
}
}
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.FILTER_FIRST_TIMESERIES_METADATA, System.nanoTime() - startTime);
}
}
@Override
protected void filterFirstChunkMetadata() throws IOException {
- if (firstChunkMetadata != null && !isChunkOverlapped() && !firstChunkMetadata.isModified()) {
- Filter queryFilter = scanOptions.getQueryFilter();
- if (queryFilter != null) {
- // TODO accept valueStatisticsList to filter
- if (!queryFilter.satisfy(firstChunkMetadata.getStatistics())) {
- skipCurrentChunk();
- }
- } else {
- // For aligned series, When we only query some measurements under an aligned device, if the
- // values of these queried measurements at a timestamp are all null, the timestamp will not
- // be selected.
- // NOTE: if we change the query semantic in the future for aligned series, we need to remove
- // this check here.
- long rowCount = firstChunkMetadata.getStatistics().getCount();
- for (Statistics statistics :
- ((AlignedChunkMetadata) firstChunkMetadata).getValueStatisticsList()) {
- if (statistics == null || statistics.hasNullValue(rowCount)) {
- return;
+ long startTime = System.nanoTime();
+ try {
+ if (firstChunkMetadata != null && !isChunkOverlapped() && !firstChunkMetadata.isModified()) {
+ Filter queryFilter = scanOptions.getQueryFilter();
+ if (queryFilter != null) {
+ // TODO accept valueStatisticsList to filter
+ if (!queryFilter.satisfy(firstChunkMetadata.getStatistics())) {
+ skipCurrentChunk();
+ }
+ } else {
+ // For aligned series, When we only query some measurements under an aligned device, if
+ // the
+ // values of these queried measurements at a timestamp are all null, the timestamp will
+ // not
+ // be selected.
+ // NOTE: if we change the query semantic in the future for aligned series, we need to
+ // remove
+ // this check here.
+ long rowCount = firstChunkMetadata.getStatistics().getCount();
+ for (Statistics statistics :
+ ((AlignedChunkMetadata) firstChunkMetadata).getValueStatisticsList()) {
+ if (statistics == null || statistics.hasNullValue(rowCount)) {
+ return;
+ }
+ }
+ // When the number of points in all value chunks is the same as that in the time chunk, it
+ // means that there is no null value, and all timestamps will be selected.
+ if (paginationController.hasCurOffset(rowCount)) {
+ skipCurrentChunk();
+ paginationController.consumeOffset(rowCount);
}
- }
- // When the number of points in all value chunks is the same as that in the time chunk, it
- // means that there is no null value, and all timestamps will be selected.
- if (paginationController.hasCurOffset(rowCount)) {
- skipCurrentChunk();
- paginationController.consumeOffset(rowCount);
}
}
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.FILTER_FIRST_CHUNK_METADATA, System.nanoTime() - startTime);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index d3988be3667..21915712a5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.chunk.MemAlignedPageReader;
import org.apache.iotdb.db.query.reader.chunk.MemPageReader;
@@ -66,6 +67,7 @@ import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLO
import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_PAGE_READER_ALIGNED_MEM;
import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_DISK;
import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_MEM;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.PAGE_READER;
public class SeriesScanUtil {
@@ -174,42 +176,48 @@ public class SeriesScanUtil {
}
public boolean hasNextFile() throws IOException {
- if (!paginationController.hasCurLimit()) {
- return false;
- }
+ long startTime = System.nanoTime();
+ try {
+ if (!paginationController.hasCurLimit()) {
+ return false;
+ }
- if (!unSeqPageReaders.isEmpty()
- || firstPageReader != null
- || mergeReader.hasNextTimeValuePair()) {
- throw new IOException(
- "all cached pages should be consumed first unSeqPageReaders.isEmpty() is "
- + unSeqPageReaders.isEmpty()
- + " firstPageReader != null is "
- + (firstPageReader != null)
- + " mergeReader.hasNextTimeValuePair() = "
- + mergeReader.hasNextTimeValuePair());
- }
+ if (!unSeqPageReaders.isEmpty()
+ || firstPageReader != null
+ || mergeReader.hasNextTimeValuePair()) {
+ throw new IOException(
+ "all cached pages should be consumed first unSeqPageReaders.isEmpty() is "
+ + unSeqPageReaders.isEmpty()
+ + " firstPageReader != null is "
+ + (firstPageReader != null)
+ + " mergeReader.hasNextTimeValuePair() = "
+ + mergeReader.hasNextTimeValuePair());
+ }
- if (firstChunkMetadata != null || !cachedChunkMetadata.isEmpty()) {
- throw new IOException("all cached chunks should be consumed first");
- }
+ if (firstChunkMetadata != null || !cachedChunkMetadata.isEmpty()) {
+ throw new IOException("all cached chunks should be consumed first");
+ }
- if (firstTimeSeriesMetadata != null) {
- return true;
- }
+ if (firstTimeSeriesMetadata != null) {
+ return true;
+ }
- while (firstTimeSeriesMetadata == null
- && (orderUtils.hasNextSeqResource()
- || orderUtils.hasNextUnseqResource()
- || !seqTimeSeriesMetadata.isEmpty()
- || !unSeqTimeSeriesMetadata.isEmpty())) {
- // init first time series metadata whose startTime is minimum
- tryToUnpackAllOverlappedFilesToTimeSeriesMetadata();
- // filter file based on push-down conditions
- filterFirstTimeSeriesMetadata();
- }
+ while (firstTimeSeriesMetadata == null
+ && (orderUtils.hasNextSeqResource()
+ || orderUtils.hasNextUnseqResource()
+ || !seqTimeSeriesMetadata.isEmpty()
+ || !unSeqTimeSeriesMetadata.isEmpty())) {
+ // init first time series metadata whose startTime is minimum
+ tryToUnpackAllOverlappedFilesToTimeSeriesMetadata();
+ // filter file based on push-down conditions
+ filterFirstTimeSeriesMetadata();
+ }
- return firstTimeSeriesMetadata != null;
+ return firstTimeSeriesMetadata != null;
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.HAS_NEXT_FILE, System.nanoTime() - startTime);
+ }
}
boolean isFileOverlapped() throws IOException {
@@ -254,50 +262,62 @@ public class SeriesScanUtil {
* overlapped chunks are consumed
*/
public boolean hasNextChunk() throws IOException {
- if (!paginationController.hasCurLimit()) {
- return false;
- }
+ long startTime = System.nanoTime();
+ try {
+ if (!paginationController.hasCurLimit()) {
+ return false;
+ }
- if (!unSeqPageReaders.isEmpty()
- || firstPageReader != null
- || mergeReader.hasNextTimeValuePair()) {
- throw new IOException(
- "all cached pages should be consumed first unSeqPageReaders.isEmpty() is "
- + unSeqPageReaders.isEmpty()
- + " firstPageReader != null is "
- + (firstPageReader != null)
- + " mergeReader.hasNextTimeValuePair() = "
- + mergeReader.hasNextTimeValuePair());
- }
+ if (!unSeqPageReaders.isEmpty()
+ || firstPageReader != null
+ || mergeReader.hasNextTimeValuePair()) {
+ throw new IOException(
+ "all cached pages should be consumed first unSeqPageReaders.isEmpty() is "
+ + unSeqPageReaders.isEmpty()
+ + " firstPageReader != null is "
+ + (firstPageReader != null)
+ + " mergeReader.hasNextTimeValuePair() = "
+ + mergeReader.hasNextTimeValuePair());
+ }
- if (firstChunkMetadata != null) {
- return true;
- // hasNextFile() has not been invoked
- } else if (firstTimeSeriesMetadata == null && cachedChunkMetadata.isEmpty()) {
- return false;
- }
+ if (firstChunkMetadata != null) {
+ return true;
+ // hasNextFile() has not been invoked
+ } else if (firstTimeSeriesMetadata == null && cachedChunkMetadata.isEmpty()) {
+ return false;
+ }
- while (firstChunkMetadata == null && (!cachedChunkMetadata.isEmpty() || hasNextFile())) {
- initFirstChunkMetadata();
- // filter chunk based on push-down conditions
- filterFirstChunkMetadata();
+ while (firstChunkMetadata == null && (!cachedChunkMetadata.isEmpty() || hasNextFile())) {
+ initFirstChunkMetadata();
+ // filter chunk based on push-down conditions
+ filterFirstChunkMetadata();
+ }
+ return firstChunkMetadata != null;
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.HAS_NEXT_CHUNK, System.nanoTime() - startTime);
}
- return firstChunkMetadata != null;
}
protected void filterFirstChunkMetadata() throws IOException {
- if (firstChunkMetadata != null && !isChunkOverlapped() && !firstChunkMetadata.isModified()) {
- Filter queryFilter = scanOptions.getQueryFilter();
- Statistics statistics = firstChunkMetadata.getStatistics();
- if (queryFilter == null || queryFilter.allSatisfy(statistics)) {
- long rowCount = statistics.getCount();
- if (paginationController.hasCurOffset(rowCount)) {
+ long startTime = System.nanoTime();
+ try {
+ if (firstChunkMetadata != null && !isChunkOverlapped() && !firstChunkMetadata.isModified()) {
+ Filter queryFilter = scanOptions.getQueryFilter();
+ Statistics statistics = firstChunkMetadata.getStatistics();
+ if (queryFilter == null || queryFilter.allSatisfy(statistics)) {
+ long rowCount = statistics.getCount();
+ if (paginationController.hasCurOffset(rowCount)) {
+ skipCurrentChunk();
+ paginationController.consumeOffset(rowCount);
+ }
+ } else if (!queryFilter.satisfy(statistics)) {
skipCurrentChunk();
- paginationController.consumeOffset(rowCount);
}
- } else if (!queryFilter.satisfy(statistics)) {
- skipCurrentChunk();
}
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.FILTER_FIRST_CHUNK_METADATA, System.nanoTime() - startTime);
}
}
@@ -401,60 +421,67 @@ public class SeriesScanUtil {
@SuppressWarnings("squid:S3776")
// Suppress high Cognitive Complexity warning
public boolean hasNextPage() throws IOException {
- if (!paginationController.hasCurLimit()) {
- return false;
- }
+ long startTime = System.nanoTime();
+ try {
+ if (!paginationController.hasCurLimit()) {
+ return false;
+ }
- /*
- * has overlapped data before
- */
- if (hasCachedNextOverlappedPage) {
- return true;
- } else if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
- if (hasNextOverlappedPage()) {
- cachedTsBlock = nextOverlappedPage();
- if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) {
- hasCachedNextOverlappedPage = true;
- return true;
+ /*
+ * has overlapped data before
+ */
+ if (hasCachedNextOverlappedPage) {
+ return true;
+ } else if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
+ if (hasNextOverlappedPage()) {
+ cachedTsBlock = nextOverlappedPage();
+ if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) {
+ hasCachedNextOverlappedPage = true;
+ return true;
+ }
}
}
- }
- if (firstPageReader != null) {
- return true;
- }
+ if (firstPageReader != null) {
+ return true;
+ }
- /*
- * construct first page reader
- */
- if (firstChunkMetadata != null) {
- /*
- * try to unpack all overlapped ChunkMetadata to cachedPageReaders
- */
- unpackAllOverlappedChunkMetadataToPageReaders(
- orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true);
- } else {
/*
- * first chunk metadata is already unpacked, consume cached pages
+ * construct first page reader
*/
- initFirstPageReader();
- }
+ if (firstChunkMetadata != null) {
+ /*
+ * try to unpack all overlapped ChunkMetadata to cachedPageReaders
+ */
+ unpackAllOverlappedChunkMetadataToPageReaders(
+ orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true);
+ } else {
+ /*
+ * first chunk metadata is already unpacked, consume cached pages
+ */
+ initFirstPageReader();
+ }
- if (isExistOverlappedPage()) {
- return true;
- }
+ if (isExistOverlappedPage()) {
+ return true;
+ }
- // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page
- // readers
- while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
+ // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page
+ // readers
+ while (firstPageReader == null
+ && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
- initFirstPageReader();
+ initFirstPageReader();
- if (isExistOverlappedPage()) {
- return true;
+ if (isExistOverlappedPage()) {
+ return true;
+ }
}
+ return firstPageReader != null;
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.HAS_NEXT_PAGE, System.nanoTime() - startTime);
}
- return firstPageReader != null;
}
private boolean isExistOverlappedPage() throws IOException {
@@ -765,66 +792,73 @@ public class SeriesScanUtil {
*/
timeValuePair = mergeReader.nextTimeValuePair();
- Object valueForFilter = timeValuePair.getValue().getValue();
-
- // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will
- // only accept AlignedPath with only one sub sensor
- if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) {
- for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) {
- if (tsPrimitiveType != null) {
- valueForFilter = tsPrimitiveType.getValue();
- break;
+ long st = System.nanoTime();
+ try {
+ Object valueForFilter = timeValuePair.getValue().getValue();
+
+ // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it
+ // will
+ // only accept AlignedPath with only one sub sensor
+ if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) {
+ for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) {
+ if (tsPrimitiveType != null) {
+ valueForFilter = tsPrimitiveType.getValue();
+ break;
+ }
}
}
- }
- Filter queryFilter = scanOptions.getQueryFilter();
- if (queryFilter != null
- && !queryFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
- continue;
- }
- if (paginationController.hasCurOffset()) {
- paginationController.consumeOffset();
- continue;
- }
- if (paginationController.hasCurLimit()) {
- timeBuilder.writeLong(timeValuePair.getTimestamp());
- switch (dataType) {
- case BOOLEAN:
- builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean());
- break;
- case INT32:
- builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt());
- break;
- case INT64:
- builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong());
- break;
- case FLOAT:
- builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat());
- break;
- case DOUBLE:
- builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble());
- break;
- case TEXT:
- builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary());
- break;
- case VECTOR:
- TsPrimitiveType[] values = timeValuePair.getValue().getVector();
- for (int i = 0; i < values.length; i++) {
- if (values[i] == null) {
- builder.getColumnBuilder(i).appendNull();
- } else {
- builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+ Filter queryFilter = scanOptions.getQueryFilter();
+ if (queryFilter != null
+ && !queryFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
+ continue;
+ }
+ if (paginationController.hasCurOffset()) {
+ paginationController.consumeOffset();
+ continue;
+ }
+ if (paginationController.hasCurLimit()) {
+ timeBuilder.writeLong(timeValuePair.getTimestamp());
+ switch (dataType) {
+ case BOOLEAN:
+ builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean());
+ break;
+ case INT32:
+ builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt());
+ break;
+ case INT64:
+ builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong());
+ break;
+ case FLOAT:
+ builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat());
+ break;
+ case DOUBLE:
+ builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble());
+ break;
+ case TEXT:
+ builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary());
+ break;
+ case VECTOR:
+ TsPrimitiveType[] values = timeValuePair.getValue().getVector();
+ for (int i = 0; i < values.length; i++) {
+ if (values[i] == null) {
+ builder.getColumnBuilder(i).appendNull();
+ } else {
+ builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+ }
}
- }
- break;
- default:
- throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ }
+ builder.declarePosition();
+ paginationController.consumeLimit();
+ } else {
+ break;
}
- builder.declarePosition();
- paginationController.consumeLimit();
- } else {
- break;
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.MERGE_READER_BUILD_RES, System.nanoTime() - st);
}
}
hasCachedNextOverlappedPage = !builder.isEmpty();
@@ -843,11 +877,13 @@ public class SeriesScanUtil {
}
}
} finally {
+ long costTime = System.nanoTime() - startTime;
QUERY_METRICS.recordSeriesScanCost(
isAligned
? BUILD_TSBLOCK_FROM_MERGE_READER_ALIGNED
: BUILD_TSBLOCK_FROM_MERGE_READER_NONALIGNED,
- System.nanoTime() - startTime);
+ costTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.HAS_NEXT_OVERLAPPED_PAGE, costTime);
}
}
@@ -992,18 +1028,23 @@ public class SeriesScanUtil {
* find end time of the first TimeSeriesMetadata
*/
long endTime = -1L;
- if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) {
- // only has seq
- endTime = orderUtils.getOverlapCheckTime(seqTimeSeriesMetadata.get(0).getStatistics());
- } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) {
- // only has unseq
- endTime = orderUtils.getOverlapCheckTime(unSeqTimeSeriesMetadata.peek().getStatistics());
- } else if (!seqTimeSeriesMetadata.isEmpty()) {
- // has seq and unseq
- endTime =
- orderUtils.getCurrentEndPoint(
- seqTimeSeriesMetadata.get(0).getStatistics(),
- unSeqTimeSeriesMetadata.peek().getStatistics());
+ long t1 = System.nanoTime();
+ try {
+ if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) {
+ // only has seq
+ endTime = orderUtils.getOverlapCheckTime(seqTimeSeriesMetadata.get(0).getStatistics());
+ } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) {
+ // only has unseq
+ endTime = orderUtils.getOverlapCheckTime(unSeqTimeSeriesMetadata.peek().getStatistics());
+ } else if (!seqTimeSeriesMetadata.isEmpty()) {
+ // has seq and unseq
+ endTime =
+ orderUtils.getCurrentEndPoint(
+ seqTimeSeriesMetadata.get(0).getStatistics(),
+ unSeqTimeSeriesMetadata.peek().getStatistics());
+ }
+ } finally {
+ QueryStatistics.getInstance().addCost(QueryStatistics.FIND_END_TIME, System.nanoTime() - t1);
}
/*
@@ -1016,39 +1057,51 @@ public class SeriesScanUtil {
/*
* update the first TimeSeriesMetadata
*/
- if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) {
- // only has seq
- firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0);
- } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) {
- // only has unseq
- firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll();
- } else if (!seqTimeSeriesMetadata.isEmpty()) {
- // has seq and unseq
- if (orderUtils.isTakeSeqAsFirst(
- seqTimeSeriesMetadata.get(0).getStatistics(),
- unSeqTimeSeriesMetadata.peek().getStatistics())) {
+ t1 = System.nanoTime();
+ try {
+ if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) {
+ // only has seq
firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0);
- } else {
+ } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) {
+ // only has unseq
firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll();
+ } else if (!seqTimeSeriesMetadata.isEmpty()) {
+ // has seq and unseq
+ if (orderUtils.isTakeSeqAsFirst(
+ seqTimeSeriesMetadata.get(0).getStatistics(),
+ unSeqTimeSeriesMetadata.peek().getStatistics())) {
+ firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0);
+ } else {
+ firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll();
+ }
}
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.PICK_FIRST_TIMESERIES_METADATA, System.nanoTime() - t1);
}
}
protected void filterFirstTimeSeriesMetadata() throws IOException {
- if (firstTimeSeriesMetadata != null
- && !isFileOverlapped()
- && !firstTimeSeriesMetadata.isModified()) {
- Filter queryFilter = scanOptions.getQueryFilter();
- Statistics statistics = firstTimeSeriesMetadata.getStatistics();
- if (queryFilter == null || queryFilter.allSatisfy(statistics)) {
- long rowCount = statistics.getCount();
- if (paginationController.hasCurOffset(rowCount)) {
+ long startTime = System.nanoTime();
+ try {
+ if (firstTimeSeriesMetadata != null
+ && !isFileOverlapped()
+ && !firstTimeSeriesMetadata.isModified()) {
+ Filter queryFilter = scanOptions.getQueryFilter();
+ Statistics statistics = firstTimeSeriesMetadata.getStatistics();
+ if (queryFilter == null || queryFilter.allSatisfy(statistics)) {
+ long rowCount = statistics.getCount();
+ if (paginationController.hasCurOffset(rowCount)) {
+ skipCurrentFile();
+ paginationController.consumeOffset(rowCount);
+ }
+ } else if (!queryFilter.satisfy(statistics)) {
skipCurrentFile();
- paginationController.consumeOffset(rowCount);
}
- } else if (!queryFilter.satisfy(statistics)) {
- skipCurrentFile();
}
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.FILTER_FIRST_TIMESERIES_METADATA, System.nanoTime() - startTime);
}
}
@@ -1116,7 +1169,7 @@ public class SeriesScanUtil {
return scanOptions.getGlobalTimeFilter();
}
- protected static class VersionPageReader {
+ protected class VersionPageReader {
private final PriorityMergeReader.MergeReaderPriority version;
private final IPageReader data;
@@ -1160,6 +1213,7 @@ public class SeriesScanUtil {
}
return tsBlock;
} finally {
+ long costTime = System.nanoTime() - startTime;
QUERY_METRICS.recordSeriesScanCost(
isAligned
? (isMem
@@ -1168,7 +1222,8 @@ public class SeriesScanUtil {
: (isMem
? BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_MEM
: BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_DISK),
- System.nanoTime() - startTime);
+ costTime);
+ QueryStatistics.getInstance().addCost(PAGE_READER, costTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index facc6b8ff90..947e76b7e15 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -131,6 +131,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.ExplainStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkTypeStatement;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.TimePartitionUtils;
@@ -244,8 +245,10 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
} else {
schemaTree = schemaFetcher.fetchSchema(patternTree);
}
- QueryMetricsManager.getInstance()
- .recordPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime);
+ long endTime = System.nanoTime() - startTime;
+ QueryMetricsManager.getInstance().recordPlanCost(SCHEMA_FETCHER, endTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.SCHEMA_FETCHER, endTime);
+
logger.debug("[EndFetchSchema]");
// If there is no leaf node in the schema tree, the query should be completed immediately
@@ -1409,8 +1412,9 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
return partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
}
} finally {
- QueryMetricsManager.getInstance()
- .recordPlanCost(PARTITION_FETCHER, System.nanoTime() - startTime);
+ long costTime = System.nanoTime() - startTime;
+ QueryMetricsManager.getInstance().recordPlanCost(PARTITION_FETCHER, costTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.PARTITION_FETCHER, costTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index c9c71b85f69..e235048f9cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import static org.apache.iotdb.db.mpp.common.QueryId.mockQueryId;
import static org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet.ANALYZER;
@@ -48,7 +49,9 @@ public class Analyzer {
new AnalyzeVisitor(partitionFetcher, schemaFetcher).process(statement, context);
if (statement.isQuery()) {
- QueryMetricsManager.getInstance().recordPlanCost(ANALYZER, System.nanoTime() - startTime);
+ long costTime = System.nanoTime() - startTime;
+ QueryMetricsManager.getInstance().recordPlanCost(ANALYZER, costTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.ANALYZER, costTime);
}
return analysis;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index b0f6861b55e..c8f3c5ac745 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -61,6 +61,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -134,6 +135,7 @@ public class QueryExecution implements IQueryExecution {
private long totalExecutionTime;
private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
PerformanceOverviewMetrics.getInstance();
@@ -319,7 +321,10 @@ public class QueryExecution implements IQueryExecution {
syncInternalServiceClientManager,
asyncInternalServiceClientManager);
this.scheduler.start();
- PERFORMANCE_OVERVIEW_METRICS.recordScheduleCost(System.nanoTime() - startTime);
+
+ long endTime = System.nanoTime() - startTime;
+ PERFORMANCE_OVERVIEW_METRICS.recordScheduleCost(endTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.DISPATCHER, endTime);
}
// Use LogicalPlanner to do the logical query plan and logical optimization
@@ -341,7 +346,9 @@ public class QueryExecution implements IQueryExecution {
this.distributedPlan = planner.planFragments();
if (rawStatement.isQuery()) {
- QUERY_METRICS.recordPlanCost(DISTRIBUTION_PLANNER, System.nanoTime() - startTime);
+ long endTime = System.nanoTime() - startTime;
+ QUERY_METRICS.recordPlanCost(DISTRIBUTION_PLANNER, endTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.DISTRIBUTION_PLANNER, endTime);
}
if (isQuery() && logger.isDebugEnabled()) {
logger.debug(
@@ -456,7 +463,9 @@ public class QueryExecution implements IQueryExecution {
ListenableFuture<?> blocked = resultHandle.isBlocked();
blocked.get();
} finally {
- QUERY_METRICS.recordExecutionCost(WAIT_FOR_RESULT, System.nanoTime() - startTime);
+ long costTime = System.nanoTime() - startTime;
+ QUERY_METRICS.recordExecutionCost(WAIT_FOR_RESULT, costTime);
+ QUERY_STATISTICS.addCost(QueryStatistics.WAIT_FOR_RESULT, costTime);
}
if (!resultHandle.isFinished()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index d7b28b8cc83..d41f34abf5b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -38,6 +39,9 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CHECK_MEMORY;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.NODE_TO_OPERATOR;
+
/**
* Used to plan a fragment instance. Currently, we simply change it from PlanNode to executable
* Operator tree, but in the future, we may split one fragment instance into multiple pipeline to
@@ -45,6 +49,8 @@ import java.util.List;
*/
public class LocalExecutionPlanner {
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
private static final Logger LOGGER = LoggerFactory.getLogger(LocalExecutionPlanner.class);
/** allocated memory for operator execution */
@@ -62,12 +68,17 @@ public class LocalExecutionPlanner {
// Generate pipelines, return the last pipeline data structure
// TODO Replace operator with operatorFactory to build multiple driver for one pipeline
+ long startTime = System.nanoTime();
Operator root = plan.accept(new OperatorTreeGenerator(), context);
+ long endTime = System.nanoTime();
+ QUERY_STATISTICS.addCost(NODE_TO_OPERATOR, endTime - startTime);
+ startTime = endTime;
// check whether current free memory is enough to execute current query
checkMemory(root, instanceContext.getStateMachine());
-
context.addPipelineDriverFactory(root, context.getDriverContext());
+ endTime = System.nanoTime();
+ QUERY_STATISTICS.addCost(CHECK_MEMORY, endTime - startTime);
instanceContext.setSourcePaths(collectSourcePaths(context));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index f55d695b2df..f3298af010a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.optimization.PlanOptimizer;
import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import java.util.List;
@@ -46,8 +47,9 @@ public class LogicalPlanner {
// optimize the query logical plan
if (analysis.getStatement().isQuery()) {
- QueryMetricsManager.getInstance()
- .recordPlanCost(LOGICAL_PLANNER, System.nanoTime() - startTime);
+ long endTime = System.nanoTime() - startTime;
+ QueryMetricsManager.getInstance().recordPlanCost(LOGICAL_PLANNER, endTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.LOGICAL_PLANNER, endTime);
for (PlanOptimizer optimizer : optimizers) {
rootNode = optimizer.optimize(rootNode, analysis, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index cb64acaf6bd..4e1ca4877af 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
@@ -124,7 +125,9 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
RpcUtils.getStatus(
TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
} finally {
- QUERY_METRICS.recordExecutionCost(DISPATCH_READ, System.nanoTime() - startTime);
+ long endTime = System.nanoTime() - startTime;
+ QUERY_METRICS.recordExecutionCost(DISPATCH_READ, endTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.DISPATCH_READ, endTime);
}
}
return immediateFuture(new FragInstanceDispatchResult(true));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
new file mode 100644
index 00000000000..70492f79d06
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.statistics;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.text.DecimalFormat;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+@ThreadSafe
+public class QueryStatistics {
+
+ private static final long QUERY_STATISTICS_PRINT_INTERVAL_IN_MS = 100_000;
+
+ private static final Logger QUERY_STATISTICS_LOGGER =
+ LoggerFactory.getLogger(IoTDBConstant.QUERY_STATISTICS_LOGGER_NAME);
+
+ private static final DecimalFormat format = new DecimalFormat("#,###");
+
+ private final AtomicBoolean tracing = new AtomicBoolean(true);
+
+ private final Map<String, OperationStatistic> operationStatistics = new ConcurrentHashMap<>();
+
+ public static final String LOCAL_EXECUTION_PLANNER = "LocalExecutionPlanner";
+
+ public static final String CREATE_FI_CONTEXT = "CreateFIContext";
+
+ public static final String CREATE_FI_EXEC = "CreateFIExec";
+
+ public static final String NODE_TO_OPERATOR = "ToOpTree";
+
+ public static final String CHECK_MEMORY = "CheckMem";
+
+ public static final String QUERY_RESOURCE_INIT = "QueryResourceInit";
+
+ public static final String INIT_SOURCE_OP = "InitSourceOp";
+
+ public static final String QUERY_RESOURCE_LIST = "TsFileList";
+ public static final String ADD_REFERENCE = "AddRef";
+
+ public static final String LOCAL_SOURCE_HANDLE_GET_TSBLOCK = "LocalSourceHandleGetTsBlock";
+
+ public static final String LOCAL_SOURCE_HANDLE_SER_TSBLOCK = "LocalSourceHandleSerializeTsBlock";
+
+ public static final String WAIT_FOR_RESULT = "WaitForResult";
+
+ public static final String AGG_SCAN_OPERATOR = "AbstractSeriesAggregationScanOperator";
+
+ public static final String CAL_NEXT_AGG_RES = "CalcNextAggRes";
+
+ public static final String CAL_AGG_FROM_RAW_DATA = "CalcAggFromRawData";
+
+ public static final String CAL_AGG_FROM_STAT = "CalcAggFromStat";
+
+ public static final String AGGREGATOR_PROCESS_TSBLOCK = "AggProcTsBlock";
+
+ public static final String CAL_AGG_FROM_PAGE = "CalcAggFromPage";
+
+ public static final String CAL_AGG_FROM_CHUNK = "CalcAggFromChunk";
+
+ public static final String CAL_AGG_FROM_FILE = "CalcAggFromFile";
+
+ public static final String CAL_AGG_FROM_FILE_STAT = "CalcAggFromFileStat";
+ public static final String CAL_AGG_FROM_CHUNK_STAT = "CalcAggFromChunkStat";
+ public static final String CAL_AGG_FROM_PAGE_STAT = "CalcAggFromPageStat";
+
+ public static final String BUILD_AGG_RES = "BuildAggRes";
+
+ public static final String PARSER = "Parser";
+
+ public static final String CREATE_QUERY_EXEC = "CreateQueryExec";
+
+ public static final String SERIALIZE_TSBLOCK = "SerTsBlock";
+
+ public static final String ANALYZER = "Analyzer";
+ public static final String SCHEMA_FETCHER = "SchemaFetcher";
+ public static final String PARTITION_FETCHER = "PartitionFetcher";
+ public static final String LOGICAL_PLANNER = "LogicalPlanner";
+ public static final String DISTRIBUTION_PLANNER = "DistributionPlanner";
+ public static final String DISPATCHER = "Dispatcher";
+
+ public static final String DISPATCH_READ = "DispatchRead";
+
+ public static final String DRIVER_CLOSE = "CloseDriver";
+
+ public static final String DRIVER_INTERNAL_PROCESS = "DriverInternalProcess";
+
+ public static final String SEND_TSBLOCK = "SendTsBlock";
+
+ public static final String RESERVE_MEMORY = "ReserveMem";
+
+ public static final String NOTIFY_NEW_TSBLOCK = "NotifyNewTsBlock";
+
+ public static final String NOTIFY_END = "NotifyEnd";
+
+ public static final String FREE_MEM = "FreeMem";
+
+ public static final String SINK_HANDLE_END_LISTENER = "SinkHandleEndListener";
+
+ public static final String SINK_HANDLE_FINISH_LISTENER = "SinkHandleFinishListener";
+
+ public static final String CHECK_AND_INVOKE_ON_FINISHED = "CheckAndInvokeOnFinished";
+
+ public static final String SET_NO_MORE_TSBLOCK = "SetNoMoreTsBlock";
+
+ public static final String SERVER_RPC_RT = "ServerRpcRT";
+
+ public static final String LOAD_TIME_SERIES_METADATA = "loadTimeSeriesMetadata";
+ public static final String LOAD_CHUNK_METADATA_LIST = "loadChunkMetadataList";
+ public static final String LOAD_PAGE_READER_LIST = "loadPageReaderList";
+ public static final String LOAD_CHUNK = "loadChunk";
+ public static final String INIT_PAGE_READERS = "initAllPageReaders";
+ public static final String PAGE_READER = "IPageReader";
+
+ public static final String HAS_NEXT_FILE = "hasNextFile";
+ public static final String FILTER_FIRST_TIMESERIES_METADATA = "filterFirstTimeSeriesMetadata";
+ public static final String FIND_END_TIME = "findEndTime";
+ public static final String PICK_FIRST_TIMESERIES_METADATA = "pickFirstTimeSeriesMetadata";
+
+ public static final String HAS_NEXT_CHUNK = "hasNextChunk";
+ public static final String FILTER_FIRST_CHUNK_METADATA = "filterFirstChunkMetadata";
+
+ public static final String HAS_NEXT_PAGE = "hasNextPage";
+ public static final String HAS_NEXT_OVERLAPPED_PAGE = "hasNextOverlappedPage";
+ public static final String MERGE_READER_ADD_READER = "mergeReader#addReader";
+ public static final String MERGE_READER_NEXT = "mergeReader#nextTimeValuePair";
+ public static final String MERGE_READER_UPDATE_HEAP = "mergeReader#updateHeap";
+ public static final String MERGE_READER_FILL_NULL_VALUE = "mergeReader#fillNullValue";
+ public static final String MERGE_READER_BUILD_RES = "mergeReader#buildRes";
+
+ private QueryStatistics() {
+ ScheduledExecutorService scheduledExecutor =
+ IoTDBThreadPoolFactory.newScheduledThreadPool(1, "Query-Statistics-Print");
+ ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+ scheduledExecutor,
+ this::printQueryStatistics,
+ 0,
+ QUERY_STATISTICS_PRINT_INTERVAL_IN_MS,
+ TimeUnit.MILLISECONDS);
+ }
+
+ private void printQueryStatistics() {
+ if (tracing.get()) {
+
+ StringBuilder builder = new StringBuilder(System.lineSeparator());
+ builder
+ .append("Client Connection Thread:")
+ .append(System.lineSeparator())
+ .append(System.lineSeparator());
+
+ builder
+ .append("ServerRpcRT ")
+ .append(operationStatistics.get(SERVER_RPC_RT))
+ .append(System.lineSeparator());
+ builder
+ .append("|___CreateQueryExec ")
+ .append(operationStatistics.get(CREATE_QUERY_EXEC))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___Parser ")
+ .append(operationStatistics.get(PARSER))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___Analyzer ")
+ .append(operationStatistics.get(ANALYZER))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___PartitionFetcher ")
+ .append(operationStatistics.get(PARTITION_FETCHER))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___SchemaFetcher ")
+ .append(operationStatistics.get(SCHEMA_FETCHER))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___LogicalPlanner ")
+ .append(operationStatistics.get(LOGICAL_PLANNER))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___DistributionPlanner ")
+ .append(operationStatistics.get(DISTRIBUTION_PLANNER))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___Dispatcher ")
+ .append(operationStatistics.get(DISPATCHER))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___DispatchRead ")
+ .append(operationStatistics.get(DISPATCH_READ))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___LocalExecPlanner ")
+ .append(operationStatistics.get(LOCAL_EXECUTION_PLANNER))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___FIContext ")
+ .append(operationStatistics.get(CREATE_FI_CONTEXT))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___ToOpTree ")
+ .append(operationStatistics.get(NODE_TO_OPERATOR))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___CheckMem ")
+ .append(operationStatistics.get(CHECK_MEMORY))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___FIExec ")
+ .append(operationStatistics.get(CREATE_FI_EXEC))
+ .append(System.lineSeparator());
+ builder
+ .append("|___SerTsBlock ")
+ .append(operationStatistics.get(SERIALIZE_TSBLOCK))
+ .append(System.lineSeparator());
+ builder
+ .append(" |___WaitForResult ")
+ .append(operationStatistics.get(WAIT_FOR_RESULT))
+ .append(System.lineSeparator());
+ builder
+ .append(" |___GetTsBlock ")
+ .append(operationStatistics.get(LOCAL_SOURCE_HANDLE_GET_TSBLOCK))
+ .append(System.lineSeparator());
+ builder
+ .append(" |___FreeMem ")
+ .append(operationStatistics.get(FREE_MEM))
+ .append(System.lineSeparator());
+
+ builder
+ .append("Query Execution Thread:")
+ .append(System.lineSeparator())
+ .append(System.lineSeparator());
+
+ builder
+ .append("|___QueryResourceInit ")
+ .append(operationStatistics.get(QUERY_RESOURCE_INIT))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___TsFileList ")
+ .append(operationStatistics.get(QUERY_RESOURCE_LIST))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___AddRef ")
+ .append(operationStatistics.get(ADD_REFERENCE))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___InitSourceOp ")
+ .append(operationStatistics.get(INIT_SOURCE_OP))
+ .append(System.lineSeparator());
+ builder
+ .append("|___DriverInternalProcess ")
+ .append(operationStatistics.get(DRIVER_INTERNAL_PROCESS))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___AggScanOperator ")
+ .append(operationStatistics.get(AGG_SCAN_OPERATOR))
+ .append(System.lineSeparator());
+
+ builder.append("| | |[FileLoaderInterface]").append(System.lineSeparator());
+ builder
+ .append("| | |___loadTSMetadata ")
+ .append(operationStatistics.get(LOAD_TIME_SERIES_METADATA))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___loadChunkMetaList ")
+ .append(operationStatistics.get(LOAD_CHUNK_METADATA_LIST))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___loadPageReaderList ")
+ .append(operationStatistics.get(LOAD_PAGE_READER_LIST))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___loadChunk ")
+ .append(operationStatistics.get(LOAD_CHUNK))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___initPageReaders ")
+ .append(operationStatistics.get(INIT_PAGE_READERS))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___pageReader ")
+ .append(operationStatistics.get(PAGE_READER))
+ .append(System.lineSeparator());
+
+ builder.append("| | |[AggregatorInterface]").append(System.lineSeparator());
+ builder
+ .append("| | |___AggFromStat ")
+ .append(operationStatistics.get(CAL_AGG_FROM_STAT))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___AggFromRawData ")
+ .append(operationStatistics.get(CAL_AGG_FROM_RAW_DATA))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___AggProcTsBlock ")
+ .append(operationStatistics.get(AGGREGATOR_PROCESS_TSBLOCK))
+ .append(System.lineSeparator());
+
+ builder.append("| | |[OperatorMethods]").append(System.lineSeparator());
+ builder
+ .append("| | |___CalcNextAggRes ")
+ .append(operationStatistics.get(CAL_NEXT_AGG_RES))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___AggFromFile ")
+ .append(operationStatistics.get(CAL_AGG_FROM_FILE))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___TryAggFromFileStat ")
+ .append(operationStatistics.get(CAL_AGG_FROM_FILE_STAT))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___AggFromChunk ")
+ .append(operationStatistics.get(CAL_AGG_FROM_CHUNK))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___TryAggFromChunkStat ")
+ .append(operationStatistics.get(CAL_AGG_FROM_CHUNK_STAT))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___AggFromPage ")
+ .append(operationStatistics.get(CAL_AGG_FROM_PAGE))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___TryAggFromPageStat ")
+ .append(operationStatistics.get(CAL_AGG_FROM_PAGE_STAT))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___BuildAggRes ")
+ .append(operationStatistics.get(BUILD_AGG_RES))
+ .append(System.lineSeparator());
+
+ builder.append("| | |[SeriesScanUtilCost]").append(System.lineSeparator());
+ builder
+ .append("| | |___hasNextFile ")
+ .append(operationStatistics.get(HAS_NEXT_FILE))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___findEndTime ")
+ .append(operationStatistics.get(FIND_END_TIME))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___pickFirstTimeSeriesMetadata ")
+ .append(operationStatistics.get(PICK_FIRST_TIMESERIES_METADATA))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___filterFirstTimeSeriesMetadata ")
+ .append(operationStatistics.get(FILTER_FIRST_TIMESERIES_METADATA))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___hasNextChunk ")
+ .append(operationStatistics.get(HAS_NEXT_CHUNK))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___filterFirstChunkMetadata ")
+ .append(operationStatistics.get(FILTER_FIRST_CHUNK_METADATA))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___hasNextPage ")
+ .append(operationStatistics.get(HAS_NEXT_PAGE))
+ .append(System.lineSeparator());
+ builder
+ .append("| | |___hasNextOverlappedPage ")
+ .append(operationStatistics.get(HAS_NEXT_OVERLAPPED_PAGE))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___mergeReader#nextTimeValuePair ")
+ .append(operationStatistics.get(MERGE_READER_NEXT))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___mergeReader#updateHeap ")
+ .append(operationStatistics.get(MERGE_READER_UPDATE_HEAP))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___mergeReader#fillNullValue ")
+ .append(operationStatistics.get(MERGE_READER_FILL_NULL_VALUE))
+ .append(System.lineSeparator());
+ builder
+ .append("| | | |___buildTsblock ")
+ .append(operationStatistics.get(MERGE_READER_BUILD_RES))
+ .append(System.lineSeparator());
+
+ builder
+ .append("| |___SendTsBlock ")
+ .append(operationStatistics.get(SEND_TSBLOCK))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___ReserveMem ")
+ .append(operationStatistics.get(RESERVE_MEMORY))
+ .append(System.lineSeparator());
+ builder
+ .append("| |___NotifyNewTsBlock ")
+ .append(operationStatistics.get(NOTIFY_NEW_TSBLOCK))
+ .append(System.lineSeparator());
+ builder
+ .append("|___SetNoMoreTsBlock ")
+ .append(operationStatistics.get(SET_NO_MORE_TSBLOCK))
+ .append(System.lineSeparator());
+ builder
+ .append(" |___NotifyEnd ")
+ .append(operationStatistics.get(NOTIFY_END))
+ .append(System.lineSeparator());
+ builder
+ .append(" |___EndListener ")
+ .append(operationStatistics.get(SINK_HANDLE_END_LISTENER))
+ .append(System.lineSeparator());
+ builder
+ .append(" |___CkAndInvOnFinished ")
+ .append(operationStatistics.get(CHECK_AND_INVOKE_ON_FINISHED))
+ .append(System.lineSeparator());
+ builder
+ .append(" |___FinishListener ")
+ .append(operationStatistics.get(SINK_HANDLE_FINISH_LISTENER))
+ .append(System.lineSeparator());
+
+ QUERY_STATISTICS_LOGGER.info(builder.toString());
+ QUERY_STATISTICS_LOGGER.info("");
+ }
+ }
+
+ public static QueryStatistics getInstance() {
+ return QueryStatisticsHolder.INSTANCE;
+ }
+
+ public void addCost(String key, long costTimeInNanos) {
+ if (tracing.get()) {
+ operationStatistics
+ .computeIfAbsent(key, k -> new OperationStatistic())
+ .addTimeCost(costTimeInNanos);
+ }
+ }
+
+ private static class OperationStatistic {
+ // accumulated operation time in ns
+ private final AtomicLong totalTime;
+ private final AtomicLong totalCount;
+
+ public OperationStatistic() {
+ this.totalTime = new AtomicLong(0);
+ this.totalCount = new AtomicLong(0);
+ }
+
+ public void addTimeCost(long costTimeInNanos) {
+ totalTime.addAndGet(costTimeInNanos);
+ totalCount.incrementAndGet();
+ }
+
+ @Override
+ public String toString() {
+ long time = totalTime.get() / 1_000;
+ long count = totalCount.get();
+ return "{"
+ + "totalTime="
+ + format.format(time)
+ + "us"
+ + ", totalCount="
+ + format.format(count)
+ + ", avgOpTime="
+ + format.format(time / count)
+ + "us"
+ + '}';
+ }
+ }
+
+ private static class QueryStatisticsHolder {
+
+ private static final QueryStatistics INSTANCE = new QueryStatistics();
+
+ private QueryStatisticsHolder() {}
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java
index b433641af08..efb0ca8dd24 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.reader.chunk;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
@@ -72,7 +73,9 @@ public class DiskAlignedChunkLoader implements IChunkLoader {
long t2 = System.nanoTime();
IChunkReader chunkReader = new AlignedChunkReader(timeChunk, valueChunkList, timeFilter);
- QUERY_METRICS.recordSeriesScanCost(INIT_CHUNK_READER_ALIGNED_DISK, System.nanoTime() - t2);
+ long costTime = System.nanoTime() - t2;
+ QUERY_METRICS.recordSeriesScanCost(INIT_CHUNK_READER_ALIGNED_DISK, costTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.INIT_PAGE_READERS, costTime);
return chunkReader;
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java
index 85ee83bee42..b038737719a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.reader.chunk;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.read.common.Chunk;
@@ -65,7 +66,9 @@ public class DiskChunkLoader implements IChunkLoader {
long t2 = System.nanoTime();
IChunkReader chunkReader = new ChunkReader(chunk, timeFilter);
- QUERY_METRICS.recordSeriesScanCost(INIT_CHUNK_READER_NONALIGNED_DISK, System.nanoTime() - t2);
+ long costTime = System.nanoTime() - t2;
+ QUERY_METRICS.recordSeriesScanCost(INIT_CHUNK_READER_NONALIGNED_DISK, costTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.INIT_PAGE_READERS, costTime);
return chunkReader;
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java
index 9d5875abdee..801aaf85d7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.query.reader.universal;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -35,7 +36,13 @@ public class AlignedPriorityMergeReader extends PriorityMergeReader {
*/
@Override
protected void fillNullValue(TimeValuePair v, TimeValuePair c) {
- fillNullValueInAligned(v, c);
+ long startTime = System.nanoTime();
+ try {
+ fillNullValueInAligned(v, c);
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.MERGE_READER_FILL_NULL_VALUE, System.nanoTime() - startTime);
+ }
}
static void fillNullValueInAligned(TimeValuePair v, TimeValuePair c) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index 471090bb627..475b0d0a286 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -18,12 +18,12 @@
*/
package org.apache.iotdb.db.query.reader.universal;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import java.io.IOException;
-import java.util.List;
import java.util.Objects;
import java.util.PriorityQueue;
@@ -47,21 +47,6 @@ public class PriorityMergeReader implements IPointReader {
});
}
- // only used in external sort, need to refactor later
- public PriorityMergeReader(List<IPointReader> prioritySeriesReaders, int startPriority)
- throws IOException {
- heap =
- new PriorityQueue<>(
- (o1, o2) -> {
- int timeCompare =
- Long.compare(o1.timeValuePair.getTimestamp(), o2.timeValuePair.getTimestamp());
- return timeCompare != 0 ? timeCompare : o2.priority.compareTo(o1.priority);
- });
- for (IPointReader reader : prioritySeriesReaders) {
- addReader(reader, startPriority++);
- }
- }
-
public void addReader(IPointReader reader, long priority) throws IOException {
if (reader.hasNextTimeValuePair()) {
heap.add(
@@ -74,11 +59,17 @@ public class PriorityMergeReader implements IPointReader {
public void addReader(
IPointReader reader, MergeReaderPriority priority, long endTime, QueryContext context)
throws IOException {
- if (reader.hasNextTimeValuePair()) {
- heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
- currentReadStopTime = Math.max(currentReadStopTime, endTime);
- } else {
- reader.close();
+ long startTime = System.nanoTime();
+ try {
+ if (reader.hasNextTimeValuePair()) {
+ heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
+ currentReadStopTime = Math.max(currentReadStopTime, endTime);
+ } else {
+ reader.close();
+ }
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.MERGE_READER_ADD_READER, System.nanoTime() - startTime);
}
}
@@ -93,19 +84,25 @@ public class PriorityMergeReader implements IPointReader {
@Override
public TimeValuePair nextTimeValuePair() throws IOException {
- Element top = heap.poll();
- TimeValuePair ret = top.getTimeValuePair();
- TimeValuePair topNext = null;
- if (top.hasNext()) {
- top.next();
- topNext = top.currPair();
- }
- updateHeap(ret, topNext);
- if (topNext != null) {
- top.timeValuePair = topNext;
- heap.add(top);
+ long startTime = System.nanoTime();
+ try {
+ Element top = heap.poll();
+ TimeValuePair ret = top.getTimeValuePair();
+ TimeValuePair topNext = null;
+ if (top.hasNext()) {
+ top.next();
+ topNext = top.currPair();
+ }
+ updateHeap(ret, topNext);
+ if (topNext != null) {
+ top.timeValuePair = topNext;
+ heap.add(top);
+ }
+ return ret;
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.MERGE_READER_NEXT, System.nanoTime() - startTime);
}
- return ret;
}
@Override
@@ -119,29 +116,35 @@ public class PriorityMergeReader implements IPointReader {
* TimeValuePair
*/
protected void updateHeap(TimeValuePair ret, TimeValuePair topNext) throws IOException {
- long topTime = ret.getTimestamp();
- long topNextTime = (topNext == null ? Long.MAX_VALUE : topNext.getTimestamp());
- while (!heap.isEmpty() && heap.peek().currTime() == topTime) {
- Element e = heap.poll();
- fillNullValue(ret, e.getTimeValuePair());
- if (!e.hasNext()) {
- e.reader.close();
- continue;
- }
- e.next();
- if (e.currTime() == topNextTime) {
- // if the next value of the peek will be overwritten by the next of the top, skip it
- fillNullValue(topNext, e.getTimeValuePair());
- if (e.hasNext()) {
- e.next();
- heap.add(e);
+ long startTime = System.nanoTime();
+ try {
+ long topTime = ret.getTimestamp();
+ long topNextTime = (topNext == null ? Long.MAX_VALUE : topNext.getTimestamp());
+ while (!heap.isEmpty() && heap.peek().currTime() == topTime) {
+ Element e = heap.poll();
+ fillNullValue(ret, e.getTimeValuePair());
+ if (!e.hasNext()) {
+ e.reader.close();
+ continue;
+ }
+ e.next();
+ if (e.currTime() == topNextTime) {
+ // if the next value of the peek will be overwritten by the next of the top, skip it
+ fillNullValue(topNext, e.getTimeValuePair());
+ if (e.hasNext()) {
+ e.next();
+ heap.add(e);
+ } else {
+ // the chunk is end
+ e.close();
+ }
} else {
- // the chunk is end
- e.close();
+ heap.add(e);
}
- } else {
- heap.add(e);
}
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.MERGE_READER_UPDATE_HEAP, System.nanoTime() - startTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 4465c6f0284..f2b54b05c16 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemp
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.DropSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.SetSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
@@ -133,6 +134,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SERVER_RPC_RT;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
@@ -164,16 +166,22 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
private static final SelectResult SELECT_RESULT =
(resp, queryExecution, fetchSize) -> {
+ long startTime = System.nanoTime();
Pair<List<ByteBuffer>, Boolean> pair =
QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, fetchSize);
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.SERIALIZE_TSBLOCK, System.nanoTime() - startTime);
resp.setQueryResult(pair.left);
return pair.right;
};
private static final SelectResult OLD_SELECT_RESULT =
(resp, queryExecution, fetchSize) -> {
+ long startTime = System.nanoTime();
Pair<TSQueryDataSet, Boolean> pair =
QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize);
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.SERIALIZE_TSBLOCK, System.nanoTime() - startTime);
resp.setQueryDataSet(pair.left);
return pair.right;
};
@@ -194,6 +202,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
long startTime = System.currentTimeMillis();
+ long startTimeInNano = System.nanoTime();
StatementType statementType = null;
Throwable t = null;
try {
@@ -204,6 +213,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
RpcUtils.getStatus(
TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported"));
}
+
+ if (s.isQuery()) {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.PARSER, System.nanoTime() - startTimeInNano);
+ }
+
// permission check
TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -216,6 +231,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+ long start = System.nanoTime();
// create and cache dataset
ExecutionResult result =
COORDINATOR.execute(
@@ -226,6 +242,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
partitionFetcher,
schemaFetcher,
req.getTimeout());
+ if (s.isQuery()) {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.CREATE_QUERY_EXEC, System.nanoTime() - start);
+ }
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
@@ -489,7 +509,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req) {
- return executeStatementV2(req);
+ long startTime = System.nanoTime();
+ try {
+ return executeStatementV2(req);
+ } finally {
+ QueryStatistics.getInstance().addCost(SERVER_RPC_RT, System.nanoTime() - startTime);
+ }
}
@Override
@@ -519,6 +544,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
+ long startTimeNanos = System.nanoTime();
long startTime = System.currentTimeMillis();
boolean finished = false;
StatementType statementType = null;
@@ -570,6 +596,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.cleanupQueryExecution(req.queryId, t);
}
SESSION_MANAGER.updateIdleTime();
+ QueryStatistics.getInstance().addCost(SERVER_RPC_RT, System.nanoTime() - startTimeNanos);
}
}
@@ -1024,7 +1051,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
- return executeStatement(req);
+ long startTime = System.nanoTime();
+ try {
+ return executeStatement(req);
+ } finally {
+ QueryStatistics.getInstance().addCost(SERVER_RPC_RT, System.nanoTime() - startTime);
+ }
}
@Override
@@ -1035,6 +1067,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
boolean finished = false;
+ long startTimeNanos = System.nanoTime();
long startTime = System.currentTimeMillis();
StatementType statementType = null;
Throwable t = null;
@@ -1085,6 +1118,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
COORDINATOR.cleanupQueryExecution(req.queryId, t);
}
SESSION_MANAGER.updateIdleTime();
+ QueryStatistics.getInstance().addCost(SERVER_RPC_RT, System.nanoTime() - startTimeNanos);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 1dc6500ac4f..6cd3a6d50c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.chunk.metadata.DiskAlignedChunkMetadataLoader;
import org.apache.iotdb.db.query.reader.chunk.metadata.DiskChunkMetadataLoader;
@@ -175,11 +176,12 @@ public class FileLoaderUtils {
}
return timeSeriesMetadata;
} finally {
+ long costTime = System.nanoTime() - t1;
QUERY_METRICS.recordSeriesScanCost(
loadFromMem
? LOAD_TIMESERIES_METADATA_NONALIGNED_MEM
: LOAD_TIMESERIES_METADATA_NONALIGNED_DISK,
- System.nanoTime() - t1);
+ costTime);
}
}
@@ -286,11 +288,13 @@ public class FileLoaderUtils {
}
return alignedTimeSeriesMetadata;
} finally {
+ long costTime = System.nanoTime() - t1;
QUERY_METRICS.recordSeriesScanCost(
loadFromMem
? LOAD_TIMESERIES_METADATA_ALIGNED_MEM
: LOAD_TIMESERIES_METADATA_ALIGNED_DISK,
- System.nanoTime() - t1);
+ costTime);
+ QueryStatistics.getInstance().addCost(QueryStatistics.LOAD_TIME_SERIES_METADATA, costTime);
}
}
@@ -301,7 +305,11 @@ public class FileLoaderUtils {
*/
public static List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeSeriesMetadata)
throws IOException {
- return timeSeriesMetadata.loadChunkMetadataList();
+ long startTime = System.nanoTime();
+ List<IChunkMetadata> chunkMetadataList = timeSeriesMetadata.loadChunkMetadataList();
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.LOAD_CHUNK_METADATA_LIST, System.nanoTime() - startTime);
+ return chunkMetadataList;
}
/**
@@ -312,11 +320,17 @@ public class FileLoaderUtils {
*/
public static List<IPageReader> loadPageReaderList(
IChunkMetadata chunkMetaData, Filter timeFilter) throws IOException {
- if (chunkMetaData == null) {
- throw new IOException("Can't init null chunkMeta");
+ long startTime = System.nanoTime();
+ try {
+ if (chunkMetaData == null) {
+ throw new IOException("Can't init null chunkMeta");
+ }
+ IChunkLoader chunkLoader = chunkMetaData.getChunkLoader();
+ IChunkReader chunkReader = chunkLoader.getChunkReader(chunkMetaData, timeFilter);
+ return chunkReader.loadPageReaderList();
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.LOAD_PAGE_READER_LIST, System.nanoTime() - startTime);
}
- IChunkLoader chunkLoader = chunkMetaData.getChunkLoader();
- IChunkReader chunkReader = chunkLoader.getChunkReader(chunkMetaData, timeFilter);
- return chunkReader.loadPageReaderList();
}
}