You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/18 15:58:45 UTC
[iotdb] 02/05: add metrics: operator_execution
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/addQueryMetrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 868a7586c18124034f409d00069ecedf67ee57a4
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Dec 15 16:46:44 2022 +0800
add metrics: operator_execution
---
.../apache/iotdb/metrics/config/MetricConfig.java | 5 ++--
.../iotdb/commons/service/metric/enums/Metric.java | 4 ++-
.../iotdb/db/mpp/execution/driver/Driver.java | 19 ++++++++++--
.../iotdb/db/mpp/execution/operator/Operator.java | 23 ++++++++++++++
.../db/mpp/execution/operator/OperatorContext.java | 19 ++++++++++++
.../operator/process/AbstractIntoOperator.java | 4 +--
.../operator/process/AggregationOperator.java | 4 +--
.../operator/process/DeviceMergeOperator.java | 6 ++--
.../operator/process/DeviceViewOperator.java | 6 ++--
.../execution/operator/process/FillOperator.java | 4 +--
.../operator/process/FilterAndProjectOperator.java | 4 +--
.../execution/operator/process/LimitOperator.java | 4 +--
.../operator/process/LinearFillOperator.java | 6 ++--
.../operator/process/MergeSortOperator.java | 6 ++--
.../execution/operator/process/OffsetOperator.java | 4 +--
.../process/RawDataAggregationOperator.java | 8 ++---
.../operator/process/SingleDeviceViewOperator.java | 6 ++--
.../process/SingleInputAggregationOperator.java | 2 +-
.../process/SlidingWindowAggregationOperator.java | 6 ++--
.../operator/process/TagAggregationOperator.java | 6 ++--
.../operator/process/TransformOperator.java | 2 +-
.../process/join/RowBasedTimeJoinOperator.java | 6 ++--
.../operator/process/join/TimeJoinOperator.java | 6 ++--
.../process/join/VerticallyConcatOperator.java | 6 ++--
.../last/AbstractUpdateLastCacheOperator.java | 2 +-
.../last/AlignedUpdateLastCacheOperator.java | 2 +-
.../process/last/LastQueryCollectOperator.java | 6 ++--
.../process/last/LastQueryMergeOperator.java | 6 ++--
.../operator/process/last/LastQueryOperator.java | 6 ++--
.../process/last/LastQuerySortOperator.java | 6 ++--
.../process/last/UpdateLastCacheOperator.java | 2 +-
.../operator/schema/CountMergeOperator.java | 6 ++--
.../schema/LevelTimeSeriesCountOperator.java | 2 +-
.../schema/NodeManageMemoryMergeOperator.java | 4 +--
.../operator/schema/NodePathsConvertOperator.java | 4 +--
.../operator/schema/NodePathsCountOperator.java | 4 +--
.../operator/schema/SchemaFetchMergeOperator.java | 6 ++--
.../operator/schema/SchemaQueryMergeOperator.java | 6 ++--
.../schema/SchemaQueryOrderByHeatOperator.java | 6 ++--
.../operator/schema/SchemaQueryScanOperator.java | 2 +-
.../AbstractSeriesAggregationScanOperator.java | 2 +-
.../operator/source/LastCacheScanOperator.java | 2 +-
.../iotdb/db/mpp/metric/QueryMetricsManager.java | 35 ++++++++++++++++++----
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 5 ++--
.../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 2 +-
.../db/mpp/plan/execution/QueryExecution.java | 2 +-
.../db/mpp/plan/parser/StatementGenerator.java | 2 +-
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 3 +-
.../dag/input/TsBlockInputDataSet.java | 6 ++--
49 files changed, 190 insertions(+), 105 deletions(-)
diff --git a/metrics/interface/src/main/java/org/apache/iotdb/metrics/config/MetricConfig.java b/metrics/interface/src/main/java/org/apache/iotdb/metrics/config/MetricConfig.java
index b17410e12d..3d50eac41d 100644
--- a/metrics/interface/src/main/java/org/apache/iotdb/metrics/config/MetricConfig.java
+++ b/metrics/interface/src/main/java/org/apache/iotdb/metrics/config/MetricConfig.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.ReporterType;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.List;
import java.util.Objects;
@@ -34,7 +34,8 @@ public class MetricConfig {
private MetricFrameType metricFrameType = MetricFrameType.MICROMETER;
/** The list of reporters provide metrics for external tool */
- private List<ReporterType> metricReporterList = Collections.singletonList(ReporterType.JMX);
+ private List<ReporterType> metricReporterList =
+ Arrays.asList(ReporterType.JMX, ReporterType.PROMETHEUS);
/** The level of metric service */
private MetricLevel metricLevel = MetricLevel.IMPORTANT;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index cc9b017328..1c0ab35d31 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -62,7 +62,9 @@ public enum Metric {
THRIFT_ACTIVE_THREADS,
IOT_CONSENSUS,
STAGE,
- QUERY_PLAN_COST;
+ QUERY_PLAN_COST,
+ OPERATOR_EXECUTION_COST,
+ OPERATOR_EXECUTION_COUNT;
@Override
public String toString() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index 086234fa12..92d8574294 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.execution.driver;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.collect.ImmutableList;
@@ -58,6 +60,8 @@ public abstract class Driver implements IDriver {
protected final DriverLock exclusiveLock = new DriverLock();
+ protected final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
protected enum State {
ALIVE,
NEED_DESTRUCTION,
@@ -182,8 +186,8 @@ public abstract class Driver implements IDriver {
if (!blocked.isDone()) {
return blocked;
}
- if (root.hasNext()) {
- TsBlock tsBlock = root.next();
+ if (root.hasNextWithTimer()) {
+ TsBlock tsBlock = root.nextWithTimer();
if (tsBlock != null && !tsBlock.isEmpty()) {
sinkHandle.send(tsBlock);
}
@@ -332,6 +336,17 @@ public abstract class Driver implements IDriver {
try {
root.close();
sinkHandle.setNoMoreTsBlocks();
+
+ // record operator execution statistics to metrics
+ List<OperatorContext> operatorContexts =
+ driverContext.getFragmentInstanceContext().getOperatorContexts();
+ for (OperatorContext operatorContext : operatorContexts) {
+ String operatorType = operatorContext.getOperatorType();
+ QUERY_METRICS.recordOperatorExecutionCost(
+ operatorType, operatorContext.getTotalExecutionTimeInNanos());
+ QUERY_METRICS.recordOperatorExecutionCount(
+ operatorType, operatorContext.getNextCalledCount());
+ }
} catch (InterruptedException t) {
// don't record the stack
wasInterrupted = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
index b7b05a9289..aaf404d467 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
@@ -38,9 +38,32 @@ public interface Operator extends AutoCloseable {
return NOT_BLOCKED;
}
+ default TsBlock nextWithTimer() {
+ OperatorContext context = getOperatorContext();
+ long startTime = System.nanoTime();
+
+ try {
+ return next();
+ } finally {
+ context.recordExecutionTime(System.nanoTime() - startTime);
+ context.recordNextCalled();
+ }
+ }
+
/** Gets next tsBlock from this operator. If no data is currently available, return null. */
TsBlock next();
+ default boolean hasNextWithTimer() {
+ OperatorContext context = getOperatorContext();
+ long startTime = System.nanoTime();
+
+ try {
+ return hasNext();
+ } finally {
+ context.recordExecutionTime(System.nanoTime() - startTime);
+ }
+ }
+
/** @return true if the operator has more data, otherwise false */
boolean hasNext();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
index 447066f4c8..3e9620a1b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java
@@ -40,6 +40,9 @@ public class OperatorContext {
private Duration maxRunTime;
+ private long totalExecutionTimeInNanos = 0L;
+ private long nextCalledCount = 0L;
+
public OperatorContext(
int operatorId,
PlanNodeId planNodeId,
@@ -75,6 +78,22 @@ public class OperatorContext {
return instanceContext.getSessionInfo();
}
+ public void recordExecutionTime(long executionTimeInNanos) {
+ this.totalExecutionTimeInNanos += executionTimeInNanos;
+ }
+
+ public void recordNextCalled() {
+ this.nextCalledCount++;
+ }
+
+ public long getTotalExecutionTimeInNanos() {
+ return totalExecutionTimeInNanos;
+ }
+
+ public long getNextCalledCount() {
+ return nextCalledCount;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 5f4b8dda34..ad24a16ade 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -139,8 +139,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
}
cachedTsBlock = null;
- if (child.hasNext()) {
- TsBlock inputTsBlock = child.next();
+ if (child.hasNextWithTimer()) {
+ TsBlock inputTsBlock = child.nextWithTimer();
processTsBlock(inputTsBlock);
// call child.next only once
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 0d7a6c89f7..aa86c6585a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -175,7 +175,7 @@ public class AggregationOperator implements ProcessOperator {
@Override
public boolean isFinished() {
- return !this.hasNext();
+ return !this.hasNextWithTimer();
}
@Override
@@ -194,7 +194,7 @@ public class AggregationOperator implements ProcessOperator {
return false;
}
- inputTsBlocks[i] = children.get(i).next();
+ inputTsBlocks[i] = children.get(i).nextWithTimer();
canCallNext[i] = false;
if (inputTsBlocks[i] == null) {
return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index 511a75991b..c08895ceaa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -115,8 +115,8 @@ public class DeviceMergeOperator implements ProcessOperator {
public TsBlock next() {
// get new input TsBlock
for (int i = 0; i < inputOperatorsCount; i++) {
- if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && deviceOperators.get(i).hasNext()) {
- inputTsBlocks[i] = deviceOperators.get(i).next();
+ if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && deviceOperators.get(i).hasNextWithTimer()) {
+ inputTsBlocks[i] = deviceOperators.get(i).nextWithTimer();
if (inputTsBlocks[i] == null || inputTsBlocks[i].isEmpty()) {
return null;
}
@@ -204,7 +204,7 @@ public class DeviceMergeOperator implements ProcessOperator {
if (!isTsBlockEmpty(i)) {
return true;
} else if (!noMoreTsBlocks[i]) {
- if (deviceOperators.get(i).hasNext()) {
+ if (deviceOperators.get(i).hasNextWithTimer()) {
return true;
} else {
noMoreTsBlocks[i] = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index 9a3a5e6634..242586a78e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -108,11 +108,11 @@ public class DeviceViewOperator implements ProcessOperator {
@Override
public TsBlock next() {
- if (!getCurDeviceOperator().hasNext()) {
+ if (!getCurDeviceOperator().hasNextWithTimer()) {
deviceIndex++;
return null;
}
- TsBlock tsBlock = getCurDeviceOperator().next();
+ TsBlock tsBlock = getCurDeviceOperator().nextWithTimer();
if (tsBlock == null) {
return null;
}
@@ -151,7 +151,7 @@ public class DeviceViewOperator implements ProcessOperator {
@Override
public boolean isFinished() {
- return !this.hasNext();
+ return !this.hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index c182168bd3..92ec771f32 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -58,7 +58,7 @@ public class FillOperator implements ProcessOperator {
@Override
public TsBlock next() {
- TsBlock block = child.next();
+ TsBlock block = child.nextWithTimer();
if (block == null) {
return null;
}
@@ -79,7 +79,7 @@ public class FillOperator implements ProcessOperator {
@Override
public boolean hasNext() {
- return child.hasNext();
+ return child.hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
index fcb03660a8..c08fcfd27d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
@@ -95,7 +95,7 @@ public class FilterAndProjectOperator implements ProcessOperator {
@Override
public TsBlock next() {
- TsBlock input = inputOperator.next();
+ TsBlock input = inputOperator.nextWithTimer();
if (input == null) {
return null;
}
@@ -193,7 +193,7 @@ public class FilterAndProjectOperator implements ProcessOperator {
@Override
public boolean hasNext() {
- return inputOperator.hasNext();
+ return inputOperator.hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index 726f5e7ecf..68604cd1ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -52,7 +52,7 @@ public class LimitOperator implements ProcessOperator {
@Override
public TsBlock next() {
- TsBlock block = child.next();
+ TsBlock block = child.nextWithTimer();
if (block == null) {
return null;
}
@@ -68,7 +68,7 @@ public class LimitOperator implements ProcessOperator {
@Override
public boolean hasNext() {
- return remainingLimit > 0 && child.hasNext();
+ return remainingLimit > 0 && child.hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index bcbb92a932..3792829196 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -87,7 +87,7 @@ public class LinearFillOperator implements ProcessOperator {
// make sure we call child.next() at most once
if (cachedTsBlock.isEmpty()) {
canCallNext = false;
- TsBlock nextTsBlock = child.next();
+ TsBlock nextTsBlock = child.nextWithTimer();
// child operator's calculation is not finished, so we just return null
if (nextTsBlock == null || nextTsBlock.isEmpty()) {
return nextTsBlock;
@@ -144,7 +144,7 @@ public class LinearFillOperator implements ProcessOperator {
@Override
public boolean hasNext() {
// if child.hasNext() return false, it means that there is no more tsBlocks
- noMoreTsBlock = !child.hasNext();
+ noMoreTsBlock = !child.hasNextWithTimer();
// if there is more tsBlock, we can call child.next() once
canCallNext = !noMoreTsBlock;
return !cachedTsBlock.isEmpty() || !noMoreTsBlock;
@@ -212,7 +212,7 @@ public class LinearFillOperator implements ProcessOperator {
if (canCallNext) { // if we can call child.next(), we call that and cache it in
// cachedTsBlock
canCallNext = false;
- TsBlock nextTsBlock = child.next();
+ TsBlock nextTsBlock = child.nextWithTimer();
// child operator's calculation is not finished, so we just return null
if (nextTsBlock == null || nextTsBlock.isEmpty()) {
return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
index 372d01cb23..db11810fa2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
@@ -96,8 +96,8 @@ public class MergeSortOperator implements ProcessOperator {
public TsBlock next() {
// 1. fill consumed up TsBlock
for (int i = 0; i < inputOperatorsCount; i++) {
- if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && inputOperators.get(i).hasNext()) {
- inputTsBlocks[i] = inputOperators.get(i).next();
+ if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && inputOperators.get(i).hasNextWithTimer()) {
+ inputTsBlocks[i] = inputOperators.get(i).nextWithTimer();
if (inputTsBlocks[i] == null || inputTsBlocks[i].isEmpty()) {
return null;
}
@@ -160,7 +160,7 @@ public class MergeSortOperator implements ProcessOperator {
if (!isTsBlockEmpty(i)) {
return true;
} else if (!noMoreTsBlocks[i]) {
- if (inputOperators.get(i).hasNext()) {
+ if (inputOperators.get(i).hasNextWithTimer()) {
return true;
} else {
noMoreTsBlocks[i] = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index 572738d081..e6f146eac1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -52,7 +52,7 @@ public class OffsetOperator implements ProcessOperator {
@Override
public TsBlock next() {
- TsBlock block = child.next();
+ TsBlock block = child.nextWithTimer();
if (block == null) {
return null;
}
@@ -67,7 +67,7 @@ public class OffsetOperator implements ProcessOperator {
@Override
public boolean hasNext() {
- return child.hasNext();
+ return child.hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index 30488c18d3..b5ce5ef823 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -58,7 +58,7 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
}
private boolean hasMoreData() {
- return inputTsBlock != null || child.hasNext();
+ return inputTsBlock != null || child.hasNextWithTimer();
}
@Override
@@ -72,10 +72,10 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
inputTsBlock = null;
// NOTE: child.next() can only be invoked once
- if (child.hasNext() && canCallNext) {
- inputTsBlock = child.next();
+ if (child.hasNextWithTimer() && canCallNext) {
+ inputTsBlock = child.nextWithTimer();
canCallNext = false;
- } else if (child.hasNext()) {
+ } else if (child.hasNextWithTimer()) {
// if child still has next but can't be invoked now
return false;
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java
index 0107af8121..a22d52c9e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java
@@ -82,7 +82,7 @@ public class SingleDeviceViewOperator implements ProcessOperator {
@Override
public TsBlock next() {
- TsBlock tsBlock = deviceOperator.next();
+ TsBlock tsBlock = deviceOperator.nextWithTimer();
if (tsBlock == null) {
return null;
}
@@ -104,7 +104,7 @@ public class SingleDeviceViewOperator implements ProcessOperator {
@Override
public boolean hasNext() {
- return deviceOperator.hasNext();
+ return deviceOperator.hasNextWithTimer();
}
@Override
@@ -114,7 +114,7 @@ public class SingleDeviceViewOperator implements ProcessOperator {
@Override
public boolean isFinished() {
- return !this.hasNext();
+ return !this.hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index 16071aea1e..8953755d5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -108,7 +108,7 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
@Override
public boolean isFinished() {
- return !this.hasNext();
+ return !this.hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 76499849cb..805b183677 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -78,10 +78,10 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
while (!isCalculationDone()) {
if (inputTsBlock == null) {
// NOTE: child.next() can only be invoked once
- if (child.hasNext() && canCallNext) {
- inputTsBlock = child.next();
+ if (child.hasNextWithTimer() && canCallNext) {
+ inputTsBlock = child.nextWithTimer();
canCallNext = false;
- } else if (child.hasNext()) {
+ } else if (child.hasNextWithTimer()) {
// if child still has next but can't be invoked now
return false;
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
index cf18c26b74..778e0d28c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
@@ -127,7 +127,7 @@ public class TagAggregationOperator implements ProcessOperator {
}
// If the data is unavailable first, try to find next tsblock of the child.
- inputTsBlocks[i] = children.get(i).next();
+ inputTsBlocks[i] = children.get(i).nextWithTimer();
consumedIndices[i] = 0;
canCallNext[i] = false;
@@ -186,7 +186,7 @@ public class TagAggregationOperator implements ProcessOperator {
@Override
public boolean hasNext() {
for (int i = 0; i < children.size(); i++) {
- if (dataUnavailable(i) && !children.get(i).hasNext()) {
+ if (dataUnavailable(i) && !children.get(i).hasNextWithTimer()) {
return false;
}
}
@@ -195,7 +195,7 @@ public class TagAggregationOperator implements ProcessOperator {
@Override
public boolean isFinished() {
- return !this.hasNext();
+ return !this.hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index 4346ed231e..2e80c93139 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -342,7 +342,7 @@ public class TransformOperator implements ProcessOperator {
@Override
public boolean isFinished() {
// call hasNext first, or data of inputOperator could be missing
- boolean flag = !hasNext();
+ boolean flag = !hasNextWithTimer();
return timeHeap.isEmpty() && (flag || inputOperator.isFinished());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 695893d400..1c4211d003 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -137,9 +137,9 @@ public class RowBasedTimeJoinOperator implements ProcessOperator {
// among all the input TsBlock as the current output TsBlock's endTime.
for (int i = 0; i < inputOperatorsCount; i++) {
if (!noMoreTsBlocks[i] && empty(i)) {
- if (children.get(i).hasNext()) {
+ if (children.get(i).hasNextWithTimer()) {
inputIndex[i] = 0;
- inputTsBlocks[i] = children.get(i).next();
+ inputTsBlocks[i] = children.get(i).nextWithTimer();
if (!empty(i)) {
updateTimeSelector(i);
} else {
@@ -210,7 +210,7 @@ public class RowBasedTimeJoinOperator implements ProcessOperator {
if (!empty(i)) {
return true;
} else if (!noMoreTsBlocks[i]) {
- if (children.get(i).hasNext()) {
+ if (children.get(i).hasNextWithTimer()) {
return true;
} else {
noMoreTsBlocks[i] = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
index 3170838986..18bcd8836a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
@@ -137,9 +137,9 @@ public class TimeJoinOperator implements ProcessOperator {
// among all the input TsBlock as the current output TsBlock's endTime.
for (int i = 0; i < inputOperatorsCount; i++) {
if (!noMoreTsBlocks[i] && empty(i)) {
- if (children.get(i).hasNext()) {
+ if (children.get(i).hasNextWithTimer()) {
inputIndex[i] = 0;
- inputTsBlocks[i] = children.get(i).next();
+ inputTsBlocks[i] = children.get(i).nextWithTimer();
if (!empty(i)) {
int rowSize = inputTsBlocks[i].getPositionCount();
for (int row = 0; row < rowSize; row++) {
@@ -209,7 +209,7 @@ public class TimeJoinOperator implements ProcessOperator {
if (!empty(i)) {
return true;
} else if (!noMoreTsBlocks[i]) {
- if (children.get(i).hasNext()) {
+ if (children.get(i).hasNextWithTimer()) {
return true;
} else {
noMoreTsBlocks[i] = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/VerticallyConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/VerticallyConcatOperator.java
index 2a4ffd58c5..1fe4390082 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/VerticallyConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/VerticallyConcatOperator.java
@@ -99,7 +99,7 @@ public class VerticallyConcatOperator implements ProcessOperator {
for (int i = 0; i < inputOperatorsCount; i++) {
if (empty(i)) {
inputIndex[i] = 0;
- inputTsBlocks[i] = children.get(i).next();
+ inputTsBlocks[i] = children.get(i).nextWithTimer();
if (empty(i)) {
// child operator has not prepared TsBlock well
return null;
@@ -144,7 +144,7 @@ public class VerticallyConcatOperator implements ProcessOperator {
if (finished) {
return false;
}
- return !empty(0) || children.get(0).hasNext();
+ return !empty(0) || children.get(0).hasNextWithTimer();
}
@Override
@@ -159,7 +159,7 @@ public class VerticallyConcatOperator implements ProcessOperator {
if (finished) {
return true;
}
- return finished = empty(0) && !children.get(0).hasNext();
+ return finished = empty(0) && !children.get(0).hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
index 4850fb45c4..3c182e0668 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
@@ -82,7 +82,7 @@ public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator
@Override
public boolean hasNext() {
- return child.hasNext();
+ return child.hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
index 14354cbc3b..f78c19e4fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
@@ -51,7 +51,7 @@ public class AlignedUpdateLastCacheOperator extends AbstractUpdateLastCacheOpera
@Override
public TsBlock next() {
- TsBlock res = child.next();
+ TsBlock res = child.nextWithTimer();
if (res == null) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
index b91e346372..b69ecfa388 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
@@ -61,8 +61,8 @@ public class LastQueryCollectOperator implements ProcessOperator {
@Override
public TsBlock next() {
- if (children.get(currentIndex).hasNext()) {
- return children.get(currentIndex).next();
+ if (children.get(currentIndex).hasNextWithTimer()) {
+ return children.get(currentIndex).nextWithTimer();
} else {
currentIndex++;
return null;
@@ -83,7 +83,7 @@ public class LastQueryCollectOperator implements ProcessOperator {
@Override
public boolean isFinished() {
- return !hasNext();
+ return !hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index 5ad04bdc2c..fcc074b024 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -116,9 +116,9 @@ public class LastQueryMergeOperator implements ProcessOperator {
// among all the input TsBlock as the current output TsBlock's endTimeSeries.
for (int i = 0; i < inputOperatorsCount; i++) {
if (!noMoreTsBlocks[i] && empty(i)) {
- if (children.get(i).hasNext()) {
+ if (children.get(i).hasNextWithTimer()) {
inputIndex[i] = 0;
- inputTsBlocks[i] = children.get(i).next();
+ inputTsBlocks[i] = children.get(i).nextWithTimer();
if (!empty(i)) {
int rowSize = inputTsBlocks[i].getPositionCount();
for (int row = 0; row < rowSize; row++) {
@@ -187,7 +187,7 @@ public class LastQueryMergeOperator implements ProcessOperator {
if (!empty(i)) {
return true;
} else if (!noMoreTsBlocks[i]) {
- if (children.get(i).hasNext()) {
+ if (children.get(i).hasNextWithTimer()) {
return true;
} else {
noMoreTsBlocks[i] = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index 7a689d4aa0..c0f825e02a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -104,8 +104,8 @@ public class LastQueryOperator implements ProcessOperator {
while ((System.nanoTime() - start < maxRuntime)
&& (currentIndex < endIndex)
&& !tsBlockBuilder.isFull()) {
- if (children.get(currentIndex).hasNext()) {
- TsBlock tsBlock = children.get(currentIndex).next();
+ if (children.get(currentIndex).hasNextWithTimer()) {
+ TsBlock tsBlock = children.get(currentIndex).nextWithTimer();
if (tsBlock == null) {
return null;
} else if (!tsBlock.isEmpty()) {
@@ -127,7 +127,7 @@ public class LastQueryOperator implements ProcessOperator {
@Override
public boolean isFinished() {
- return !hasNext();
+ return !hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index aeb38b6b7d..3e16a8e6d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -148,8 +148,8 @@ public class LastQuerySortOperator implements ProcessOperator {
previousTsBlock = null;
}
} else {
- if (children.get(currentIndex).hasNext()) {
- TsBlock tsBlock = children.get(currentIndex).next();
+ if (children.get(currentIndex).hasNextWithTimer()) {
+ TsBlock tsBlock = children.get(currentIndex).nextWithTimer();
if (tsBlock == null) {
return null;
} else if (!tsBlock.isEmpty()) {
@@ -189,7 +189,7 @@ public class LastQuerySortOperator implements ProcessOperator {
@Override
public boolean isFinished() {
- return !hasNext();
+ return !hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 7315c81f46..69972a40e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -53,7 +53,7 @@ public class UpdateLastCacheOperator extends AbstractUpdateLastCacheOperator {
@Override
public TsBlock next() {
- TsBlock res = child.next();
+ TsBlock res = child.nextWithTimer();
if (res == null) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
index 61a8758770..32f215716e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
@@ -96,8 +96,8 @@ public class CountMergeOperator implements ProcessOperator {
if (childrenTsBlocks[i] == null) {
// when this operator is not blocked, it means all children that have not return TsBlock is
// not blocked.
- if (children.get(i).hasNext()) {
- TsBlock tsBlock = children.get(i).next();
+ if (children.get(i).hasNextWithTimer()) {
+ TsBlock tsBlock = children.get(i).nextWithTimer();
if (tsBlock == null || tsBlock.isEmpty()) {
allChildrenReady = false;
} else {
@@ -164,7 +164,7 @@ public class CountMergeOperator implements ProcessOperator {
@Override
public boolean isFinished() {
- return !hasNext();
+ return !hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
index 9a4f471eda..ab33978776 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
@@ -140,7 +140,7 @@ public class LevelTimeSeriesCountOperator implements SourceOperator {
@Override
public boolean isFinished() {
- return !hasNext();
+ return !hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
index 781ba46695..d9315fcdbc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
@@ -79,7 +79,7 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
isReadingMemory = false;
return transferToTsBlock(data);
} else {
- TsBlock block = child.next();
+ TsBlock block = child.nextWithTimer();
if (block == null) {
return null;
}
@@ -126,7 +126,7 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
@Override
public boolean hasNext() {
- return isReadingMemory || child.hasNext();
+ return isReadingMemory || child.hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
index 217a40bd17..4d04a62e86 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
@@ -70,7 +70,7 @@ public class NodePathsConvertOperator implements ProcessOperator {
@Override
public TsBlock next() {
- TsBlock block = child.next();
+ TsBlock block = child.nextWithTimer();
if (block == null || block.isEmpty()) {
return null;
}
@@ -95,7 +95,7 @@ public class NodePathsConvertOperator implements ProcessOperator {
@Override
public boolean hasNext() {
- return child.hasNext();
+ return child.hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
index cf026777fa..3a662afe13 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
@@ -71,8 +71,8 @@ public class NodePathsCountOperator implements ProcessOperator {
if (!blocked.isDone()) {
return null;
}
- if (child.hasNext()) {
- TsBlock tsBlock = child.next();
+ if (child.hasNextWithTimer()) {
+ TsBlock tsBlock = child.nextWithTimer();
if (null != tsBlock && !tsBlock.isEmpty()) {
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
String path = tsBlock.getColumn(0).getBinary(i).toString();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
index 07a531e80c..977cd388df 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
@@ -72,8 +72,8 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
return generateStorageGroupInfo();
}
- if (children.get(currentIndex).hasNext()) {
- return children.get(currentIndex).next();
+ if (children.get(currentIndex).hasNextWithTimer()) {
+ return children.get(currentIndex).nextWithTimer();
} else {
currentIndex++;
return null;
@@ -94,7 +94,7 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
@Override
public boolean isFinished() {
- return !hasNext();
+ return !hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
index f2671034e8..0ad36268a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
@@ -51,8 +51,8 @@ public class SchemaQueryMergeOperator implements ProcessOperator {
@Override
public TsBlock next() {
- if (children.get(currentIndex).hasNext()) {
- return children.get(currentIndex).next();
+ if (children.get(currentIndex).hasNextWithTimer()) {
+ return children.get(currentIndex).nextWithTimer();
} else {
currentIndex++;
return null;
@@ -71,7 +71,7 @@ public class SchemaQueryMergeOperator implements ProcessOperator {
@Override
public boolean isFinished() {
- return !hasNext();
+ return !hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
index 68419ec464..945961f828 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
@@ -87,8 +87,8 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator {
if (operator.isFinished()) {
noMoreTsBlocks[i] = true;
} else {
- if (operator.hasNext()) {
- TsBlock tsBlock = operator.next();
+ if (operator.hasNextWithTimer()) {
+ TsBlock tsBlock = operator.nextWithTimer();
if (null != tsBlock && !tsBlock.isEmpty()) {
if (isShowTimeSeriesBlock(tsBlock)) {
showTimeSeriesResult.add(tsBlock);
@@ -203,7 +203,7 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator {
@Override
public boolean isFinished() {
- return !hasNext();
+ return !hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
index 23bbcd4366..77bdc7b6cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
@@ -107,7 +107,7 @@ public abstract class SchemaQueryScanOperator implements SourceOperator {
@Override
public boolean isFinished() {
- return !hasNext();
+ return !hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 15ad856245..ed6c92c199 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -168,7 +168,7 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
@Override
public boolean isFinished() {
- return finished || (finished = !hasNext());
+ return finished || (finished = !hasNextWithTimer());
}
protected void calculateNextAggregationResult() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
index 974758f8a5..6c5185fbf2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
@@ -54,7 +54,7 @@ public class LastCacheScanOperator implements SourceOperator {
@Override
public boolean isFinished() {
- return !hasNext();
+ return !hasNextWithTimer();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
index 8a4e878733..dac48b1680 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
@@ -22,18 +22,41 @@ package org.apache.iotdb.db.mpp.metric;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
-import org.apache.iotdb.metrics.type.Timer;
import org.apache.iotdb.metrics.utils.MetricLevel;
+import java.util.concurrent.TimeUnit;
+
public class QueryMetricsManager {
private final MetricService metricService = MetricService.getInstance();
- public void addPlanCost(String stage, long costTimeInNanos) {
- Timer timer =
- metricService.getOrCreateTimer(
- Metric.QUERY_PLAN_COST.toString(), MetricLevel.IMPORTANT, Tag.STAGE.toString(), stage);
- timer.updateNanos(costTimeInNanos);
+ public void recordPlanCost(String stage, long costTimeInNanos) {
+ metricService.timer(
+ costTimeInNanos,
+ TimeUnit.NANOSECONDS,
+ Metric.QUERY_PLAN_COST.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.STAGE.toString(),
+ stage);
+ }
+
+ public void recordOperatorExecutionCost(String operatorType, long costTimeInNanos) {
+ metricService.timer(
+ costTimeInNanos,
+ TimeUnit.NANOSECONDS,
+ Metric.OPERATOR_EXECUTION_COST.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ operatorType);
+ }
+
+ public void recordOperatorExecutionCount(String operatorType, long count) {
+ metricService.count(
+ count,
+ Metric.OPERATOR_EXECUTION_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ operatorType);
}
public static QueryMetricsManager getInstance() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 8b582fb05a..00d63837ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -211,7 +211,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
} else {
schemaTree = schemaFetcher.fetchSchema(patternTree);
}
- QueryMetricsManager.getInstance().addPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime);
+ QueryMetricsManager.getInstance()
+ .recordPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime);
logger.debug("[EndFetchSchema]");
// If there is no leaf node in the schema tree, the query should be completed immediately
@@ -1175,7 +1176,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
}
} finally {
QueryMetricsManager.getInstance()
- .addPlanCost(PARTITION_FETCHER, System.nanoTime() - startTime);
+ .recordPlanCost(PARTITION_FETCHER, System.nanoTime() - startTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 679a71627f..54d6bc0e84 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -47,7 +47,7 @@ public class Analyzer {
new AnalyzeVisitor(partitionFetcher, schemaFetcher).process(statement, context);
if (statement.isQuery()) {
- QueryMetricsManager.getInstance().addPlanCost(ANALYZER, System.nanoTime() - startTime);
+ QueryMetricsManager.getInstance().recordPlanCost(ANALYZER, System.nanoTime() - startTime);
}
return analysis;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index c150183527..23e8c75029 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -308,7 +308,7 @@ public class QueryExecution implements IQueryExecution {
if (rawStatement.isQuery()) {
QueryMetricsManager.getInstance()
- .addPlanCost(DISTRIBUTION_PLANNER, System.nanoTime() - startTime);
+ .recordPlanCost(DISTRIBUTION_PLANNER, System.nanoTime() - startTime);
}
if (isQuery() && logger.isDebugEnabled()) {
logger.debug(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 604442462e..f69362f66a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -520,7 +520,7 @@ public class StatementGenerator {
}
return astVisitor.visit(tree);
} finally {
- QueryMetricsManager.getInstance().addPlanCost(SQL_PARSER, System.nanoTime() - startTime);
+ QueryMetricsManager.getInstance().recordPlanCost(SQL_PARSER, System.nanoTime() - startTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index cc25e475ac..47d9b63e10 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -46,7 +46,8 @@ public class LogicalPlanner {
// optimize the query logical plan
if (analysis.getStatement().isQuery()) {
- QueryMetricsManager.getInstance().addPlanCost(LOGICAL_PLANNER, System.nanoTime() - startTime);
+ QueryMetricsManager.getInstance()
+ .recordPlanCost(LOGICAL_PLANNER, System.nanoTime() - startTime);
for (PlanOptimizer optimizer : optimizers) {
rootNode = optimizer.optimize(rootNode, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/TsBlockInputDataSet.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/TsBlockInputDataSet.java
index b7790a2953..8f6629aba3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/TsBlockInputDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/TsBlockInputDataSet.java
@@ -55,10 +55,10 @@ public class TsBlockInputDataSet implements IUDFInputDataSet {
if (operator.isBlocked() != Operator.NOT_BLOCKED) {
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
}
- if (!operator.hasNext()) {
+ if (!operator.hasNextWithTimer()) {
return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
}
- final TsBlock tsBlock = operator.next();
+ final TsBlock tsBlock = operator.nextWithTimer();
if (tsBlock == null) {
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
}
@@ -72,7 +72,7 @@ public class TsBlockInputDataSet implements IUDFInputDataSet {
if (operator.isBlocked() != Operator.NOT_BLOCKED) {
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
}
- return operator.hasNext()
+ return operator.hasNextWithTimer()
? YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA
: YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
}