You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/08/14 15:34:01 UTC
[iotdb] branch lmh/AggOpMemoryControl updated: finish memory calculate for SeriesAggregationScanOperator
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/lmh/AggOpMemoryControl by this push:
new d9c912f413 finish memory calculate for SeriesAggregationScanOperator
d9c912f413 is described below
commit d9c912f4134d58882c1008739f091824d8f7ea16
Author: liuminghui233 <54...@qq.com>
AuthorDate: Sun Aug 14 23:33:21 2022 +0800
finish memory calculate for SeriesAggregationScanOperator
---
.../db/mpp/execution/operator/AggregationUtil.java | 79 +++++++++-----
.../AbstractSeriesAggregationScanOperator.java | 5 +-
.../AlignedSeriesAggregationScanOperator.java | 3 +
.../source/SeriesAggregationScanOperator.java | 3 +
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 120 +++++++++++++--------
.../operator/AggregationOperatorTest.java | 3 +
.../AlignedSeriesAggregationScanOperatorTest.java | 7 +-
.../execution/operator/LastQueryOperatorTest.java | 5 +
.../operator/LastQuerySortOperatorTest.java | 5 +
.../SeriesAggregationScanOperatorTest.java | 2 +
.../SlidingWindowAggregationOperatorTest.java | 2 +
.../operator/UpdateLastCacheOperatorTest.java | 2 +
12 files changed, 158 insertions(+), 78 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
index a47a94ff38..1896fc3af4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
@@ -19,10 +19,13 @@
package org.apache.iotdb.db.mpp.execution.operator;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.SingleTimeWindowIterator;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.TimeRangeIteratorFactory;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.statistics.StatisticsManager;
@@ -41,8 +44,11 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.iotdb.tsfile.utils.Pair;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
@@ -173,33 +179,21 @@ public class AggregationUtil {
public static long calculateMaxAggregationResultSize(
List<AggregationDescriptor> aggregationDescriptors,
ITimeRangeIterator timeRangeIterator,
- boolean isGroupByQuery) {
+ boolean isGroupByQuery,
+ TypeProvider typeProvider) {
long valueColumnsSizePerLine = 0;
- for (AggregationDescriptor descriptor : aggregationDescriptors) {}
-
- for (TSDataType tsDataType : outPutDataTypes) {
- switch (tsDataType) {
- case INT32:
- valueColumnsSizePerLine += IntColumn.SIZE_IN_BYTES_PER_POSITION;
- break;
- case INT64:
- valueColumnsSizePerLine += LongColumn.SIZE_IN_BYTES_PER_POSITION;
- break;
- case FLOAT:
- valueColumnsSizePerLine += FloatColumn.SIZE_IN_BYTES_PER_POSITION;
- break;
- case DOUBLE:
- valueColumnsSizePerLine += DoubleColumn.SIZE_IN_BYTES_PER_POSITION;
- break;
- case BOOLEAN:
- valueColumnsSizePerLine += BooleanColumn.SIZE_IN_BYTES_PER_POSITION;
- break;
- case TEXT:
- valueColumnsSizePerLine +=
- StatisticsManager.getInstance().getMaxBinarySizeInBytes(seriesPath);
- break;
- default:
- throw new UnsupportedOperationException("Unknown data type " + tsDataType);
+ for (AggregationDescriptor descriptor : aggregationDescriptors) {
+ List<TSDataType> outPutDataTypes =
+ descriptor.getOutputColumnNames().stream()
+ .map(typeProvider::getType)
+ .collect(Collectors.toList());
+ for (TSDataType tsDataType : outPutDataTypes) {
+ checkArgument(
+ descriptor.getInputExpressions().get(0) instanceof TimeSeriesOperand,
+ "The input of aggregate function must be the original time series.");
+ PartialPath inputSeriesPath =
+ ((TimeSeriesOperand) descriptor.getInputExpressions().get(0)).getPath();
+ valueColumnsSizePerLine += getOutputColumnSizePerLine(tsDataType, inputSeriesPath);
}
}
@@ -212,4 +206,37 @@ public class AggregationUtil {
* (TimeColumn.SIZE_IN_BYTES_PER_POSITION + valueColumnsSizePerLine))
: valueColumnsSizePerLine;
}
+
+ public static long calculateMaxAggregationResultSizeForLastQuery(
+ List<Aggregator> aggregators, PartialPath inputSeriesPath) {
+ long valueColumnsSizePerLine = 0;
+ List<TSDataType> outPutDataTypes =
+ aggregators.stream()
+ .flatMap(aggregator -> Arrays.stream(aggregator.getOutputType()))
+ .collect(Collectors.toList());
+ for (TSDataType tsDataType : outPutDataTypes) {
+ valueColumnsSizePerLine += getOutputColumnSizePerLine(tsDataType, inputSeriesPath);
+ }
+ return valueColumnsSizePerLine;
+ }
+
+ private static long getOutputColumnSizePerLine(
+ TSDataType tsDataType, PartialPath inputSeriesPath) {
+ switch (tsDataType) {
+ case INT32:
+ return IntColumn.SIZE_IN_BYTES_PER_POSITION;
+ case INT64:
+ return LongColumn.SIZE_IN_BYTES_PER_POSITION;
+ case FLOAT:
+ return FloatColumn.SIZE_IN_BYTES_PER_POSITION;
+ case DOUBLE:
+ return DoubleColumn.SIZE_IN_BYTES_PER_POSITION;
+ case BOOLEAN:
+ return BooleanColumn.SIZE_IN_BYTES_PER_POSITION;
+ case TEXT:
+ return StatisticsManager.getInstance().getMaxBinarySizeInBytes(inputSeriesPath);
+ default:
+ throw new UnsupportedOperationException("Unknown data type " + tsDataType);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 3f4764ddf3..e7b3f37885 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
public abstract class AbstractSeriesAggregationScanOperator implements DataSourceOperator {
@@ -78,6 +77,7 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
SeriesScanUtil seriesScanUtil,
int subSensorSize,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
boolean ascending,
GroupByTimeParameter groupByTimeParameter,
long maxReturnSize) {
@@ -88,8 +88,7 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
this.seriesScanUtil = seriesScanUtil;
this.subSensorSize = subSensorSize;
this.aggregators = aggregators;
-
- this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true);
+ this.timeRangeIterator = timeRangeIterator;
List<TSDataType> dataTypes = new ArrayList<>();
for (Aggregator aggregator : aggregators) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
index cd7ad09bfc..2ad3eb1a83 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -37,6 +38,7 @@ public class AlignedSeriesAggregationScanOperator extends AbstractSeriesAggregat
AlignedPath seriesPath,
OperatorContext context,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
Filter timeFilter,
boolean ascending,
GroupByTimeParameter groupByTimeParameter,
@@ -53,6 +55,7 @@ public class AlignedSeriesAggregationScanOperator extends AbstractSeriesAggregat
ascending),
seriesPath.getMeasurementList().size(),
aggregators,
+ timeRangeIterator,
ascending,
groupByTimeParameter,
maxReturnSize);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
index 55547e56ee..99b24e06a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -44,6 +45,7 @@ public class SeriesAggregationScanOperator extends AbstractSeriesAggregationScan
Set<String> allSensors,
OperatorContext context,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
Filter timeFilter,
boolean ascending,
GroupByTimeParameter groupByTimeParameter,
@@ -61,6 +63,7 @@ public class SeriesAggregationScanOperator extends AbstractSeriesAggregationScan
ascending),
1,
aggregators,
+ timeRangeIterator,
ascending,
groupByTimeParameter,
maxReturnSize);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index e057d60748..cfb2b42a57 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
@@ -187,6 +188,7 @@ import java.util.Objects;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSizeForLastQuery;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
@@ -275,6 +277,58 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
return seriesScanOperator;
}
+ @Override
+ public Operator visitSeriesAggregationScan(
+ SeriesAggregationScanNode node, LocalExecutionPlanContext context) {
+ PartialPath seriesPath = node.getSeriesPath();
+ boolean ascending = node.getScanOrder() == Ordering.ASC;
+ OperatorContext operatorContext =
+ context
+ .getInstanceContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ SeriesAggregationScanOperator.class.getSimpleName());
+
+ List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
+ List<Aggregator> aggregators = new ArrayList<>();
+ aggregationDescriptors.forEach(
+ o ->
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ o.getAggregationType(), node.getSeriesPath().getSeriesType(), ascending),
+ o.getStep())));
+
+ GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, true);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ node.getAggregationDescriptorList(),
+ timeRangeIterator,
+ groupByTimeParameter != null,
+ context.getTypeProvider());
+
+ SeriesAggregationScanOperator aggregateScanOperator =
+ new SeriesAggregationScanOperator(
+ node.getPlanNodeId(),
+ seriesPath,
+ context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()),
+ operatorContext,
+ aggregators,
+ timeRangeIterator,
+ node.getTimeFilter(),
+ ascending,
+ node.getGroupByTimeParameter(),
+ maxReturnSize);
+
+ context.addSourceOperator(aggregateScanOperator);
+ context.addPath(seriesPath);
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
+ return aggregateScanOperator;
+ }
+
@Override
public Operator visitAlignedSeriesAggregationScan(
AlignedSeriesAggregationScanNode node, LocalExecutionPlanContext context) {
@@ -311,11 +365,14 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
}
GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, true);
long maxReturnSize =
AggregationUtil.calculateMaxAggregationResultSize(
node.getAggregationDescriptorList(),
- initTimeRangeIterator(groupByTimeParameter, ascending, true),
- groupByTimeParameter != null);
+ timeRangeIterator,
+ groupByTimeParameter != null,
+ context.getTypeProvider());
AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
new AlignedSeriesAggregationScanOperator(
@@ -323,6 +380,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
seriesPath,
operatorContext,
aggregators,
+ timeRangeIterator,
node.getTimeFilter(),
ascending,
groupByTimeParameter,
@@ -566,47 +624,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
return new NodePathsCountOperator(operatorContext, child);
}
- @Override
- public Operator visitSeriesAggregationScan(
- SeriesAggregationScanNode node, LocalExecutionPlanContext context) {
- PartialPath seriesPath = node.getSeriesPath();
- boolean ascending = node.getScanOrder() == Ordering.ASC;
- OperatorContext operatorContext =
- context
- .getInstanceContext()
- .addOperatorContext(
- context.getNextOperatorId(),
- node.getPlanNodeId(),
- SeriesAggregationScanOperator.class.getSimpleName());
-
- List<Aggregator> aggregators = new ArrayList<>();
- node.getAggregationDescriptorList()
- .forEach(
- o ->
- aggregators.add(
- new Aggregator(
- AccumulatorFactory.createAccumulator(
- o.getAggregationType(),
- node.getSeriesPath().getSeriesType(),
- ascending),
- o.getStep())));
- SeriesAggregationScanOperator aggregateScanOperator =
- new SeriesAggregationScanOperator(
- node.getPlanNodeId(),
- seriesPath,
- context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()),
- operatorContext,
- aggregators,
- node.getTimeFilter(),
- ascending,
- node.getGroupByTimeParameter());
-
- context.addSourceOperator(aggregateScanOperator);
- context.addPath(seriesPath);
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
- return aggregateScanOperator;
- }
-
@Override
public Operator visitDeviceView(DeviceViewNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
@@ -1411,6 +1428,10 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
// last_time, last_value
List<Aggregator> aggregators = LastQueryUtil.createAggregators(seriesPath.getSeriesType());
+ ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(null, false, false);
+ long maxReturnSize =
+ calculateMaxAggregationResultSizeForLastQuery(
+ aggregators, seriesPath.transformToPartialPath());
SeriesAggregationScanOperator seriesAggregationScanOperator =
new SeriesAggregationScanOperator(
@@ -1419,9 +1440,11 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()),
operatorContext,
aggregators,
+ timeRangeIterator,
context.getLastQueryTimeFilter(),
false,
- null);
+ null,
+ maxReturnSize);
context.addSourceOperator(seriesAggregationScanOperator);
context.addPath(seriesPath);
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
@@ -1490,15 +1513,22 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
// last_time, last_value
List<Aggregator> aggregators =
LastQueryUtil.createAggregators(seriesPath.getSchemaList().get(0).getType());
+ ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(null, false, false);
+ long maxReturnSize =
+ calculateMaxAggregationResultSizeForLastQuery(
+ aggregators, seriesPath.transformToPartialPath());
+
AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
new AlignedSeriesAggregationScanOperator(
node.getPlanNodeId(),
seriesPath,
operatorContext,
aggregators,
+ timeRangeIterator,
context.getLastQueryTimeFilter(),
false,
- null);
+ null,
+ maxReturnSize);
context.addSourceOperator(seriesAggregationScanOperator);
context.addPath(seriesPath);
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
index 01b2813a81..b3e80f1b82 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
@@ -61,6 +61,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
@@ -320,6 +321,7 @@ public class AggregationOperatorTest {
Collections.singleton("sensor0"),
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, true, true),
null,
true,
groupByTimeParameter,
@@ -343,6 +345,7 @@ public class AggregationOperatorTest {
Collections.singleton("sensor0"),
fragmentInstanceContext.getOperatorContexts().get(1),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, true, true),
null,
true,
groupByTimeParameter,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
index 937bf90c27..a2e989f47e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
@@ -63,6 +63,7 @@ import java.util.stream.Collectors;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -622,10 +623,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
1, planNodeId, SeriesScanOperator.class.getSimpleName());
fragmentInstanceContext
.getOperatorContexts()
- .forEach(
- operatorContext -> {
- operatorContext.setMaxRunTime(TEST_TIME_SLICE);
- });
+ .forEach(operatorContext -> operatorContext.setMaxRunTime(TEST_TIME_SLICE));
AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
new AlignedSeriesAggregationScanOperator(
@@ -633,6 +631,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
alignedPath,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, ascending, true),
timeFilter,
ascending,
groupByTimeParameter,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
index a086bbe8f2..4864fed39b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -134,6 +135,7 @@ public class LastQueryOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators1,
+ initTimeRangeIterator(null, false, true),
null,
false,
null,
@@ -157,6 +159,7 @@ public class LastQueryOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(2),
aggregators2,
+ initTimeRangeIterator(null, false, true),
null,
false,
null,
@@ -255,6 +258,7 @@ public class LastQueryOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators1,
+ initTimeRangeIterator(null, false, true),
null,
false,
null,
@@ -278,6 +282,7 @@ public class LastQueryOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(2),
aggregators2,
+ initTimeRangeIterator(null, false, true),
null,
false,
null,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
index 2d88d96d9c..6dc84c071b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
@@ -58,6 +58,7 @@ import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -136,6 +137,7 @@ public class LastQuerySortOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators1,
+ initTimeRangeIterator(null, false, true),
null,
false,
null,
@@ -159,6 +161,7 @@ public class LastQuerySortOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(2),
aggregators2,
+ initTimeRangeIterator(null, false, true),
null,
false,
null,
@@ -258,6 +261,7 @@ public class LastQuerySortOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators1,
+ initTimeRangeIterator(null, false, true),
null,
false,
null,
@@ -281,6 +285,7 @@ public class LastQuerySortOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(2),
aggregators2,
+ initTimeRangeIterator(null, false, true),
null,
false,
null,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
index b30ff5b1b7..f3b37b83f7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -59,6 +59,7 @@ import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
@@ -516,6 +517,7 @@ public class SeriesAggregationScanOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, ascending, true),
timeFilter,
ascending,
groupByTimeParameter,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
index 4463a45cf5..7eebdedbcc 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
@@ -61,6 +61,7 @@ import java.util.stream.Collectors;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
public class SlidingWindowAggregationOperatorTest {
@@ -233,6 +234,7 @@ public class SlidingWindowAggregationOperatorTest {
Collections.singleton("sensor0"),
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, ascending, true),
null,
ascending,
groupByTimeParameter,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
index c2e469d224..bff57278b1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -212,6 +213,7 @@ public class UpdateLastCacheOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, ascending, true),
timeFilter,
ascending,
groupByTimeParameter,