You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/07 06:47:22 UTC
[iotdb] 01/03: Add statistics for query
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch QueryMetrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d15fbff6c7475f36019a5c9dfa37a1eb3b314590
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Nov 7 14:45:17 2022 +0800
Add statistics for query
---
.../fragment/FragmentInstanceContext.java | 10 ++++-
.../fragment/FragmentInstanceManager.java | 7 ++++
.../db/mpp/execution/operator/OperatorContext.java | 11 ++++-
.../operator/process/FilterAndProjectOperator.java | 34 +++++++++------
.../process/RawDataAggregationOperator.java | 11 ++++-
.../process/SingleInputAggregationOperator.java | 23 ++++++++---
.../process/SlidingWindowAggregationOperator.java | 13 +++++-
.../AbstractSeriesAggregationScanOperator.java | 42 ++++++++++---------
.../operator/source/AlignedSeriesScanOperator.java | 6 ++-
.../operator/source/SeriesScanOperator.java | 6 ++-
.../execution/operator/source/SeriesScanUtil.java | 3 ++
.../iotdb/db/mpp/statistics/QueryStatistics.java | 16 ++++++++
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 48 +++++++++++++---------
13 files changed, 167 insertions(+), 63 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 286623529f..6ecb01df2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.query.context.QueryContext;
import org.slf4j.Logger;
@@ -42,6 +43,9 @@ public class FragmentInstanceContext extends QueryContext {
private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceContext.class);
private static final long END_TIME_INITIAL_VALUE = -1L;
+
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
private final FragmentInstanceId id;
// TODO if we split one fragment instance into multiple pipelines to run, we need to replace it
@@ -157,7 +161,7 @@ public class FragmentInstanceContext extends QueryContext {
}
OperatorContext operatorContext =
- new OperatorContext(operatorId, planNodeId, operatorType, this);
+ new OperatorContext(operatorId, planNodeId, operatorType, this, QUERY_STATISTICS);
operatorContexts.add(operatorContext);
return operatorContext;
}
@@ -223,4 +227,8 @@ public class FragmentInstanceContext extends QueryContext {
public SessionInfo getSessionInfo() {
return sessionInfo;
}
+
+ public void addOperationTime(String key, long costTimeInNanos) {
+ QUERY_STATISTICS.addCost(key, costTimeInNanos);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 1d7499560f..318476e43d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.utils.SetThreadName;
import io.airlift.stats.CounterStat;
@@ -47,6 +48,7 @@ import java.util.concurrent.TimeoutException;
import static java.util.Objects.requireNonNull;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceExecution.createFragmentInstanceExecution;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.LOCAL_EXECUTION_PLANNER;
public class FragmentInstanceManager {
@@ -68,6 +70,8 @@ public class FragmentInstanceManager {
private static final long QUERY_TIMEOUT_MS =
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
public static FragmentInstanceManager getInstance() {
return FragmentInstanceManager.InstanceHolder.INSTANCE;
}
@@ -96,6 +100,7 @@ public class FragmentInstanceManager {
FragmentInstance instance, DataRegion dataRegion) {
FragmentInstanceId instanceId = instance.getId();
+ long startTime = System.nanoTime();
try (SetThreadName fragmentInstanceName = new SetThreadName(instanceId.getFullId())) {
FragmentInstanceExecution execution =
instanceExecution.computeIfAbsent(
@@ -135,6 +140,8 @@ public class FragmentInstanceManager {
});
return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
+ } finally {
+ QUERY_STATISTICS.addCost(LOCAL_EXECUTION_PLANNER, System.nanoTime() - startTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
index 447066f4c8..bc4a718422 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import io.airlift.units.Duration;
@@ -38,17 +39,21 @@ public class OperatorContext {
private final String operatorType;
private final FragmentInstanceContext instanceContext;
+ private final QueryStatistics queryStatistics;
+
private Duration maxRunTime;
public OperatorContext(
int operatorId,
PlanNodeId planNodeId,
String operatorType,
- FragmentInstanceContext instanceContext) {
+ FragmentInstanceContext instanceContext,
+ QueryStatistics queryStatistics) {
this.operatorId = operatorId;
this.planNodeId = planNodeId;
this.operatorType = operatorType;
this.instanceContext = instanceContext;
+ this.queryStatistics = queryStatistics;
}
public int getOperatorId() {
@@ -75,6 +80,10 @@ public class OperatorContext {
return instanceContext.getSessionInfo();
}
+ public void addOperatorTime(String key, long costTimeInNanos) {
+ queryStatistics.addCost(key, costTimeInNanos);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
index eed0895a47..4c1ba15a47 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
@@ -43,19 +43,21 @@ import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.FILTER_AND_PROJECT_OPERATOR;
+
public class FilterAndProjectOperator implements ProcessOperator {
private final Operator inputOperator;
- private List<LeafColumnTransformer> filterLeafColumnTransformerList;
+ private final List<LeafColumnTransformer> filterLeafColumnTransformerList;
- private ColumnTransformer filterOutputTransformer;
+ private final ColumnTransformer filterOutputTransformer;
- private List<ColumnTransformer> commonTransformerList;
+ private final List<ColumnTransformer> commonTransformerList;
- private List<LeafColumnTransformer> projectLeafColumnTransformerList;
+ private final List<LeafColumnTransformer> projectLeafColumnTransformerList;
- private List<ColumnTransformer> projectOutputTransformerList;
+ private final List<ColumnTransformer> projectOutputTransformerList;
private final TsBlockBuilder filterTsBlockBuilder;
@@ -101,17 +103,23 @@ public class FilterAndProjectOperator implements ProcessOperator {
return null;
}
- if (!hasFilter) {
- return getTransformedTsBlock(input);
- }
+ long startTime = System.nanoTime();
- TsBlock filterResult = getFilterTsBlock(input);
+ try {
+ if (!hasFilter) {
+ return getTransformedTsBlock(input);
+ }
+
+ TsBlock filterResult = getFilterTsBlock(input);
- // contains non-mappable udf, we leave calculation for TransformOperator
- if (hasNonMappableUDF) {
- return filterResult;
+ // contains non-mappable udf, we leave calculation for TransformOperator
+ if (hasNonMappableUDF) {
+ return filterResult;
+ }
+ return getTransformedTsBlock(filterResult);
+ } finally {
+ operatorContext.addOperatorTime(FILTER_AND_PROJECT_OPERATOR, System.nanoTime() - startTime);
}
- return getTransformedTsBlock(filterResult);
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index 30488c18d3..bce16f89d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -68,7 +68,11 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
@Override
protected boolean calculateNextAggregationResult() {
+ long startTime = System.nanoTime();
+
while (!calculateFromRawData()) {
+ long endTime = System.nanoTime();
+ costTime += (endTime - startTime);
inputTsBlock = null;
// NOTE: child.next() can only be invoked once
@@ -79,19 +83,24 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
// if child still has next but can't be invoked now
return false;
} else {
+ startTime = System.nanoTime();
// If there are no points belong to last window, the last window will not
// initialize window and aggregators
if (!windowManager.isCurWindowInit()) {
initWindowAndAggregators();
}
+ endTime = System.nanoTime();
+ costTime += (endTime - startTime);
+ startTime = endTime;
break;
}
+ startTime = System.nanoTime();
}
updateResultTsBlock();
// Step into next window
windowManager.next();
-
+ costTime += (System.nanoTime() - startTime);
return true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index 16071aea1e..894673d577 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -33,6 +33,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SINGLE_INPUT_AGG_OPERATOR;
+
public abstract class SingleInputAggregationOperator implements ProcessOperator {
protected final OperatorContext operatorContext;
@@ -50,6 +52,8 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
protected final long maxRetainedSize;
protected final long maxReturnSize;
+ protected long costTime = 0L;
+
public SingleInputAggregationOperator(
OperatorContext operatorContext,
List<Aggregator> aggregators,
@@ -97,12 +101,19 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
}
}
- if (resultTsBlockBuilder.getPositionCount() > 0) {
- TsBlock resultTsBlock = resultTsBlockBuilder.build();
- resultTsBlockBuilder.reset();
- return resultTsBlock;
- } else {
- return null;
+ start = System.nanoTime();
+ try {
+ if (resultTsBlockBuilder.getPositionCount() > 0) {
+ TsBlock resultTsBlock = resultTsBlockBuilder.build();
+ resultTsBlockBuilder.reset();
+ return resultTsBlock;
+ } else {
+ return null;
+ }
+ } finally {
+ operatorContext.addOperatorTime(
+ SINGLE_INPUT_AGG_OPERATOR, costTime + (System.nanoTime() - start));
+ costTime = 0;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 76499849cb..8a8ec2c813 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -65,6 +65,7 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
@Override
protected boolean calculateNextAggregationResult() {
+ long startTime = System.nanoTime();
if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) {
// move to next time window
curTimeRange = timeRangeIterator.nextTimeRange();
@@ -75,7 +76,12 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
}
}
+ long endTime = System.nanoTime();
+ costTime += (endTime - startTime);
+ startTime = endTime;
+
while (!isCalculationDone()) {
+ costTime += (System.nanoTime() - startTime);
if (inputTsBlock == null) {
// NOTE: child.next() can only be invoked once
if (child.hasNext() && canCallNext) {
@@ -85,15 +91,20 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
// if child still has next but can't be invoked now
return false;
} else {
+ startTime = System.nanoTime();
break;
}
}
-
+ startTime = System.nanoTime();
calculateFromCachedData();
+ endTime = System.nanoTime();
+ costTime += (endTime - startTime);
+ startTime = endTime;
}
// update result using aggregators
updateResultTsBlock();
+ costTime += (System.nanoTime() - startTime);
return true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 15ad856245..c565842d66 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.AGG_SCAN_OPERATOR;
public abstract class AbstractSeriesAggregationScanOperator implements DataSourceOperator {
@@ -141,28 +142,31 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
// start stopwatch
long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
long start = System.nanoTime();
+ try {
+ while (System.nanoTime() - start < maxRuntime
+ && timeRangeIterator.hasNextTimeRange()
+ && !resultTsBlockBuilder.isFull()) {
+ // move to next time window
+ curTimeRange = timeRangeIterator.nextTimeRange();
+
+ // clear previous aggregation result
+ for (Aggregator aggregator : aggregators) {
+ aggregator.updateTimeRange(curTimeRange);
+ }
- while (System.nanoTime() - start < maxRuntime
- && timeRangeIterator.hasNextTimeRange()
- && !resultTsBlockBuilder.isFull()) {
- // move to next time window
- curTimeRange = timeRangeIterator.nextTimeRange();
-
- // clear previous aggregation result
- for (Aggregator aggregator : aggregators) {
- aggregator.updateTimeRange(curTimeRange);
+ // calculate aggregation result on current time window
+ calculateNextAggregationResult();
}
- // calculate aggregation result on current time window
- calculateNextAggregationResult();
- }
-
- if (resultTsBlockBuilder.getPositionCount() > 0) {
- TsBlock resultTsBlock = resultTsBlockBuilder.build();
- resultTsBlockBuilder.reset();
- return resultTsBlock;
- } else {
- return null;
+ if (resultTsBlockBuilder.getPositionCount() > 0) {
+ TsBlock resultTsBlock = resultTsBlockBuilder.build();
+ resultTsBlockBuilder.reset();
+ return resultTsBlock;
+ } else {
+ return null;
+ }
+ } finally {
+ operatorContext.addOperatorTime(AGG_SCAN_OPERATOR, System.nanoTime() - start);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
index 8406437802..e22d7b1f65 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.io.IOException;
import java.util.HashSet;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.ALIGNED_SERIES_SCAN_OPERATOR;
+
public class AlignedSeriesScanOperator implements DataSourceOperator {
private final OperatorContext operatorContext;
@@ -81,7 +83,7 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
@Override
public boolean hasNext() {
-
+ long startTime = System.nanoTime();
try {
if (hasCachedTsBlock) {
return true;
@@ -115,6 +117,8 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
return hasCachedTsBlock;
} catch (IOException e) {
throw new RuntimeException("Error happened while scanning the file", e);
+ } finally {
+ operatorContext.addOperatorTime(ALIGNED_SERIES_SCAN_OPERATOR, System.nanoTime() - startTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index 05685f758d..a993230378 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.io.IOException;
import java.util.Set;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SERIES_SCAN_OPERATOR;
+
public class SeriesScanOperator implements DataSourceOperator {
private final OperatorContext operatorContext;
@@ -82,7 +84,7 @@ public class SeriesScanOperator implements DataSourceOperator {
@Override
public boolean hasNext() {
-
+ long startTime = System.nanoTime();
try {
if (hasCachedTsBlock) {
return true;
@@ -116,6 +118,8 @@ public class SeriesScanOperator implements DataSourceOperator {
return hasCachedTsBlock;
} catch (IOException e) {
throw new RuntimeException("Error happened while scanning the file", e);
+ } finally {
+ operatorContext.addOperatorTime(SERIES_SCAN_OPERATOR, System.nanoTime() - startTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 22fd2cbabc..160a794230 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -58,6 +58,7 @@ import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.PAGE_READER;
public class SeriesScanUtil {
private final FragmentInstanceContext context;
@@ -1124,10 +1125,12 @@ public class SeriesScanUtil {
}
TsBlock getAllSatisfiedPageData(boolean ascending) throws IOException {
+ long startTime = System.nanoTime();
TsBlock tsBlock = data.getAllSatisfiedData();
if (!ascending) {
tsBlock.reverse();
}
+ context.addOperationTime(PAGE_READER, System.nanoTime() - startTime);
return tsBlock;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
index e61e22f3ff..abe9b512bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
@@ -46,6 +46,22 @@ public class QueryStatistics {
private final Map<String, OperationStatistic> operationStatistics = new ConcurrentHashMap<>();
+ public static final String LOCAL_EXECUTION_PLANNER = "LocalExecutionPlanner";
+
+ public static final String QUERY_EXECUTION = "QueryExecution";
+
+ public static final String SERIES_SCAN_OPERATOR = "SeriesScanOperator";
+
+ public static final String ALIGNED_SERIES_SCAN_OPERATOR = "AlignedSeriesScanOperator";
+
+ public static final String AGG_SCAN_OPERATOR = "AbstractSeriesAggregationScanOperator";
+
+ public static final String FILTER_AND_PROJECT_OPERATOR = "FilterAndProjectOperator";
+
+ public static final String SINGLE_INPUT_AGG_OPERATOR = "SingleInputAggregationOperator";
+
+ public static final String PAGE_READER = "IPageReader";
+
private QueryStatistics() {
ScheduledExecutorService scheduledExecutor =
IoTDBThreadPoolFactory.newScheduledThreadPool(1, "Query-Statistics-Print");
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index e0575880ea..a842859335 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.utils;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -44,11 +45,15 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.QUERY_EXECUTION;
+
/** TimeValuePairUtils to convert between thrift format and TsFile format. */
public class QueryDataSetUtils {
private static final int FLAG = 0x01;
+ private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
private QueryDataSetUtils() {}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
@@ -381,27 +386,32 @@ public class QueryDataSetUtils {
// To fetch required amounts of data and combine them through List
public static Pair<List<ByteBuffer>, Boolean> convertQueryResultByFetchSize(
IQueryExecution queryExecution, int fetchSize) throws IoTDBException {
- int rowCount = 0;
- List<ByteBuffer> res = new ArrayList<>();
- while (rowCount < fetchSize) {
- Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult();
- if (!optionalByteBuffer.isPresent()) {
- break;
- }
- ByteBuffer byteBuffer = optionalByteBuffer.get();
- byteBuffer.mark();
- int valueColumnCount = byteBuffer.getInt();
- for (int i = 0; i < valueColumnCount; i++) {
- byteBuffer.get();
- }
- int positionCount = byteBuffer.getInt();
- byteBuffer.reset();
- if (positionCount != 0) {
- res.add(byteBuffer);
+ long startTime = System.nanoTime();
+ try {
+ int rowCount = 0;
+ List<ByteBuffer> res = new ArrayList<>();
+ while (rowCount < fetchSize) {
+ Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult();
+ if (!optionalByteBuffer.isPresent()) {
+ break;
+ }
+ ByteBuffer byteBuffer = optionalByteBuffer.get();
+ byteBuffer.mark();
+ int valueColumnCount = byteBuffer.getInt();
+ for (int i = 0; i < valueColumnCount; i++) {
+ byteBuffer.get();
+ }
+ int positionCount = byteBuffer.getInt();
+ byteBuffer.reset();
+ if (positionCount != 0) {
+ res.add(byteBuffer);
+ }
+ rowCount += positionCount;
}
- rowCount += positionCount;
+ return new Pair<>(res, !queryExecution.hasNextResult());
+ } finally {
+ QUERY_STATISTICS.addCost(QUERY_EXECUTION, System.nanoTime() - startTime);
}
- return new Pair<>(res, !queryExecution.hasNextResult());
}
public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {