You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/08/14 15:34:01 UTC

[iotdb] branch lmh/AggOpMemoryControl updated: finish memory calculate for SeriesAggregationScanOperator

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/lmh/AggOpMemoryControl by this push:
     new d9c912f413 finish memory calculate for SeriesAggregationScanOperator
d9c912f413 is described below

commit d9c912f4134d58882c1008739f091824d8f7ea16
Author: liuminghui233 <54...@qq.com>
AuthorDate: Sun Aug 14 23:33:21 2022 +0800

    finish memory calculate for SeriesAggregationScanOperator
---
 .../db/mpp/execution/operator/AggregationUtil.java |  79 +++++++++-----
 .../AbstractSeriesAggregationScanOperator.java     |   5 +-
 .../AlignedSeriesAggregationScanOperator.java      |   3 +
 .../source/SeriesAggregationScanOperator.java      |   3 +
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 120 +++++++++++++--------
 .../operator/AggregationOperatorTest.java          |   3 +
 .../AlignedSeriesAggregationScanOperatorTest.java  |   7 +-
 .../execution/operator/LastQueryOperatorTest.java  |   5 +
 .../operator/LastQuerySortOperatorTest.java        |   5 +
 .../SeriesAggregationScanOperatorTest.java         |   2 +
 .../SlidingWindowAggregationOperatorTest.java      |   2 +
 .../operator/UpdateLastCacheOperatorTest.java      |   2 +
 12 files changed, 158 insertions(+), 78 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
index a47a94ff38..1896fc3af4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
@@ -19,10 +19,13 @@
 
 package org.apache.iotdb.db.mpp.execution.operator;
 
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.SingleTimeWindowIterator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.TimeRangeIteratorFactory;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.statistics.StatisticsManager;
@@ -41,8 +44,11 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
 
@@ -173,33 +179,21 @@ public class AggregationUtil {
   public static long calculateMaxAggregationResultSize(
       List<AggregationDescriptor> aggregationDescriptors,
       ITimeRangeIterator timeRangeIterator,
-      boolean isGroupByQuery) {
+      boolean isGroupByQuery,
+      TypeProvider typeProvider) {
     long valueColumnsSizePerLine = 0;
-    for (AggregationDescriptor descriptor : aggregationDescriptors) {}
-
-    for (TSDataType tsDataType : outPutDataTypes) {
-      switch (tsDataType) {
-        case INT32:
-          valueColumnsSizePerLine += IntColumn.SIZE_IN_BYTES_PER_POSITION;
-          break;
-        case INT64:
-          valueColumnsSizePerLine += LongColumn.SIZE_IN_BYTES_PER_POSITION;
-          break;
-        case FLOAT:
-          valueColumnsSizePerLine += FloatColumn.SIZE_IN_BYTES_PER_POSITION;
-          break;
-        case DOUBLE:
-          valueColumnsSizePerLine += DoubleColumn.SIZE_IN_BYTES_PER_POSITION;
-          break;
-        case BOOLEAN:
-          valueColumnsSizePerLine += BooleanColumn.SIZE_IN_BYTES_PER_POSITION;
-          break;
-        case TEXT:
-          valueColumnsSizePerLine +=
-              StatisticsManager.getInstance().getMaxBinarySizeInBytes(seriesPath);
-          break;
-        default:
-          throw new UnsupportedOperationException("Unknown data type " + tsDataType);
+    for (AggregationDescriptor descriptor : aggregationDescriptors) {
+      List<TSDataType> outPutDataTypes =
+          descriptor.getOutputColumnNames().stream()
+              .map(typeProvider::getType)
+              .collect(Collectors.toList());
+      for (TSDataType tsDataType : outPutDataTypes) {
+        checkArgument(
+            descriptor.getInputExpressions().get(0) instanceof TimeSeriesOperand,
+            "The input of aggregate function must be the original time series.");
+        PartialPath inputSeriesPath =
+            ((TimeSeriesOperand) descriptor.getInputExpressions().get(0)).getPath();
+        valueColumnsSizePerLine += getOutputColumnSizePerLine(tsDataType, inputSeriesPath);
       }
     }
 
@@ -212,4 +206,37 @@ public class AggregationUtil {
                 * (TimeColumn.SIZE_IN_BYTES_PER_POSITION + valueColumnsSizePerLine))
         : valueColumnsSizePerLine;
   }
+
+  public static long calculateMaxAggregationResultSizeForLastQuery(
+      List<Aggregator> aggregators, PartialPath inputSeriesPath) {
+    long valueColumnsSizePerLine = 0;
+    List<TSDataType> outPutDataTypes =
+        aggregators.stream()
+            .flatMap(aggregator -> Arrays.stream(aggregator.getOutputType()))
+            .collect(Collectors.toList());
+    for (TSDataType tsDataType : outPutDataTypes) {
+      valueColumnsSizePerLine += getOutputColumnSizePerLine(tsDataType, inputSeriesPath);
+    }
+    return valueColumnsSizePerLine;
+  }
+
+  private static long getOutputColumnSizePerLine(
+      TSDataType tsDataType, PartialPath inputSeriesPath) {
+    switch (tsDataType) {
+      case INT32:
+        return IntColumn.SIZE_IN_BYTES_PER_POSITION;
+      case INT64:
+        return LongColumn.SIZE_IN_BYTES_PER_POSITION;
+      case FLOAT:
+        return FloatColumn.SIZE_IN_BYTES_PER_POSITION;
+      case DOUBLE:
+        return DoubleColumn.SIZE_IN_BYTES_PER_POSITION;
+      case BOOLEAN:
+        return BooleanColumn.SIZE_IN_BYTES_PER_POSITION;
+      case TEXT:
+        return StatisticsManager.getInstance().getMaxBinarySizeInBytes(inputSeriesPath);
+      default:
+        throw new UnsupportedOperationException("Unknown data type " + tsDataType);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 3f4764ddf3..e7b3f37885 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
 
 public abstract class AbstractSeriesAggregationScanOperator implements DataSourceOperator {
@@ -78,6 +77,7 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
       SeriesScanUtil seriesScanUtil,
       int subSensorSize,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       boolean ascending,
       GroupByTimeParameter groupByTimeParameter,
       long maxReturnSize) {
@@ -88,8 +88,7 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
     this.seriesScanUtil = seriesScanUtil;
     this.subSensorSize = subSensorSize;
     this.aggregators = aggregators;
-
-    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true);
+    this.timeRangeIterator = timeRangeIterator;
 
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
index cd7ad09bfc..2ad3eb1a83 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -37,6 +38,7 @@ public class AlignedSeriesAggregationScanOperator extends AbstractSeriesAggregat
       AlignedPath seriesPath,
       OperatorContext context,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       Filter timeFilter,
       boolean ascending,
       GroupByTimeParameter groupByTimeParameter,
@@ -53,6 +55,7 @@ public class AlignedSeriesAggregationScanOperator extends AbstractSeriesAggregat
             ascending),
         seriesPath.getMeasurementList().size(),
         aggregators,
+        timeRangeIterator,
         ascending,
         groupByTimeParameter,
         maxReturnSize);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
index 55547e56ee..99b24e06a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -44,6 +45,7 @@ public class SeriesAggregationScanOperator extends AbstractSeriesAggregationScan
       Set<String> allSensors,
       OperatorContext context,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       Filter timeFilter,
       boolean ascending,
       GroupByTimeParameter groupByTimeParameter,
@@ -61,6 +63,7 @@ public class SeriesAggregationScanOperator extends AbstractSeriesAggregationScan
             ascending),
         1,
         aggregators,
+        timeRangeIterator,
         ascending,
         groupByTimeParameter,
         maxReturnSize);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index e057d60748..cfb2b42a57 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
@@ -187,6 +188,7 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSizeForLastQuery;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
 
@@ -275,6 +277,58 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return seriesScanOperator;
   }
 
+  @Override
+  public Operator visitSeriesAggregationScan(
+      SeriesAggregationScanNode node, LocalExecutionPlanContext context) {
+    PartialPath seriesPath = node.getSeriesPath();
+    boolean ascending = node.getScanOrder() == Ordering.ASC;
+    OperatorContext operatorContext =
+        context
+            .getInstanceContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                SeriesAggregationScanOperator.class.getSimpleName());
+
+    List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
+    List<Aggregator> aggregators = new ArrayList<>();
+    aggregationDescriptors.forEach(
+        o ->
+            aggregators.add(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(
+                        o.getAggregationType(), node.getSeriesPath().getSeriesType(), ascending),
+                    o.getStep())));
+
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(groupByTimeParameter, ascending, true);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            node.getAggregationDescriptorList(),
+            timeRangeIterator,
+            groupByTimeParameter != null,
+            context.getTypeProvider());
+
+    SeriesAggregationScanOperator aggregateScanOperator =
+        new SeriesAggregationScanOperator(
+            node.getPlanNodeId(),
+            seriesPath,
+            context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()),
+            operatorContext,
+            aggregators,
+            timeRangeIterator,
+            node.getTimeFilter(),
+            ascending,
+            node.getGroupByTimeParameter(),
+            maxReturnSize);
+
+    context.addSourceOperator(aggregateScanOperator);
+    context.addPath(seriesPath);
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
+    return aggregateScanOperator;
+  }
+
   @Override
   public Operator visitAlignedSeriesAggregationScan(
       AlignedSeriesAggregationScanNode node, LocalExecutionPlanContext context) {
@@ -311,11 +365,14 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     }
 
     GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(groupByTimeParameter, ascending, true);
     long maxReturnSize =
         AggregationUtil.calculateMaxAggregationResultSize(
             node.getAggregationDescriptorList(),
-            initTimeRangeIterator(groupByTimeParameter, ascending, true),
-            groupByTimeParameter != null);
+            timeRangeIterator,
+            groupByTimeParameter != null,
+            context.getTypeProvider());
 
     AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
         new AlignedSeriesAggregationScanOperator(
@@ -323,6 +380,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
             seriesPath,
             operatorContext,
             aggregators,
+            timeRangeIterator,
             node.getTimeFilter(),
             ascending,
             groupByTimeParameter,
@@ -566,47 +624,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return new NodePathsCountOperator(operatorContext, child);
   }
 
-  @Override
-  public Operator visitSeriesAggregationScan(
-      SeriesAggregationScanNode node, LocalExecutionPlanContext context) {
-    PartialPath seriesPath = node.getSeriesPath();
-    boolean ascending = node.getScanOrder() == Ordering.ASC;
-    OperatorContext operatorContext =
-        context
-            .getInstanceContext()
-            .addOperatorContext(
-                context.getNextOperatorId(),
-                node.getPlanNodeId(),
-                SeriesAggregationScanOperator.class.getSimpleName());
-
-    List<Aggregator> aggregators = new ArrayList<>();
-    node.getAggregationDescriptorList()
-        .forEach(
-            o ->
-                aggregators.add(
-                    new Aggregator(
-                        AccumulatorFactory.createAccumulator(
-                            o.getAggregationType(),
-                            node.getSeriesPath().getSeriesType(),
-                            ascending),
-                        o.getStep())));
-    SeriesAggregationScanOperator aggregateScanOperator =
-        new SeriesAggregationScanOperator(
-            node.getPlanNodeId(),
-            seriesPath,
-            context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()),
-            operatorContext,
-            aggregators,
-            node.getTimeFilter(),
-            ascending,
-            node.getGroupByTimeParameter());
-
-    context.addSourceOperator(aggregateScanOperator);
-    context.addPath(seriesPath);
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
-    return aggregateScanOperator;
-  }
-
   @Override
   public Operator visitDeviceView(DeviceViewNode node, LocalExecutionPlanContext context) {
     OperatorContext operatorContext =
@@ -1411,6 +1428,10 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
 
     // last_time, last_value
     List<Aggregator> aggregators = LastQueryUtil.createAggregators(seriesPath.getSeriesType());
+    ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(null, false, false);
+    long maxReturnSize =
+        calculateMaxAggregationResultSizeForLastQuery(
+            aggregators, seriesPath.transformToPartialPath());
 
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         new SeriesAggregationScanOperator(
@@ -1419,9 +1440,11 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
             context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()),
             operatorContext,
             aggregators,
+            timeRangeIterator,
             context.getLastQueryTimeFilter(),
             false,
-            null);
+            null,
+            maxReturnSize);
     context.addSourceOperator(seriesAggregationScanOperator);
     context.addPath(seriesPath);
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
@@ -1490,15 +1513,22 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     // last_time, last_value
     List<Aggregator> aggregators =
         LastQueryUtil.createAggregators(seriesPath.getSchemaList().get(0).getType());
+    ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(null, false, false);
+    long maxReturnSize =
+        calculateMaxAggregationResultSizeForLastQuery(
+            aggregators, seriesPath.transformToPartialPath());
+
     AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
         new AlignedSeriesAggregationScanOperator(
             node.getPlanNodeId(),
             seriesPath,
             operatorContext,
             aggregators,
+            timeRangeIterator,
             context.getLastQueryTimeFilter(),
             false,
-            null);
+            null,
+            maxReturnSize);
     context.addSourceOperator(seriesAggregationScanOperator);
     context.addPath(seriesPath);
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
index 01b2813a81..b3e80f1b82 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
@@ -61,6 +61,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 
@@ -320,6 +321,7 @@ public class AggregationOperatorTest {
             Collections.singleton("sensor0"),
             fragmentInstanceContext.getOperatorContexts().get(0),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, true, true),
             null,
             true,
             groupByTimeParameter,
@@ -343,6 +345,7 @@ public class AggregationOperatorTest {
             Collections.singleton("sensor0"),
             fragmentInstanceContext.getOperatorContexts().get(1),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, true, true),
             null,
             true,
             groupByTimeParameter,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
index 937bf90c27..a2e989f47e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
@@ -63,6 +63,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -622,10 +623,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         1, planNodeId, SeriesScanOperator.class.getSimpleName());
     fragmentInstanceContext
         .getOperatorContexts()
-        .forEach(
-            operatorContext -> {
-              operatorContext.setMaxRunTime(TEST_TIME_SLICE);
-            });
+        .forEach(operatorContext -> operatorContext.setMaxRunTime(TEST_TIME_SLICE));
 
     AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
         new AlignedSeriesAggregationScanOperator(
@@ -633,6 +631,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
             alignedPath,
             fragmentInstanceContext.getOperatorContexts().get(0),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, ascending, true),
             timeFilter,
             ascending,
             groupByTimeParameter,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
index a086bbe8f2..4864fed39b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ExecutorService;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -134,6 +135,7 @@ public class LastQueryOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(0),
               aggregators1,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
               null,
@@ -157,6 +159,7 @@ public class LastQueryOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(2),
               aggregators2,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
               null,
@@ -255,6 +258,7 @@ public class LastQueryOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(0),
               aggregators1,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
               null,
@@ -278,6 +282,7 @@ public class LastQueryOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(2),
               aggregators2,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
               null,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
index 2d88d96d9c..6dc84c071b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
@@ -58,6 +58,7 @@ import java.util.concurrent.ExecutorService;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -136,6 +137,7 @@ public class LastQuerySortOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(0),
               aggregators1,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
               null,
@@ -159,6 +161,7 @@ public class LastQuerySortOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(2),
               aggregators2,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
               null,
@@ -258,6 +261,7 @@ public class LastQuerySortOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(0),
               aggregators1,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
               null,
@@ -281,6 +285,7 @@ public class LastQuerySortOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(2),
               aggregators2,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
               null,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
index b30ff5b1b7..f3b37b83f7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -59,6 +59,7 @@ import java.util.concurrent.ExecutorService;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 
@@ -516,6 +517,7 @@ public class SeriesAggregationScanOperatorTest {
             allSensors,
             fragmentInstanceContext.getOperatorContexts().get(0),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, ascending, true),
             timeFilter,
             ascending,
             groupByTimeParameter,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
index 4463a45cf5..7eebdedbcc 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
@@ -61,6 +61,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class SlidingWindowAggregationOperatorTest {
@@ -233,6 +234,7 @@ public class SlidingWindowAggregationOperatorTest {
             Collections.singleton("sensor0"),
             fragmentInstanceContext.getOperatorContexts().get(0),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, ascending, true),
             null,
             ascending,
             groupByTimeParameter,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
index c2e469d224..bff57278b1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ExecutorService;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -212,6 +213,7 @@ public class UpdateLastCacheOperatorTest {
             allSensors,
             fragmentInstanceContext.getOperatorContexts().get(0),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, ascending, true),
             timeFilter,
             ascending,
             groupByTimeParameter,