You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/08/14 16:16:13 UTC

[iotdb] branch lmh/AggOpMemoryControl updated (d9c912f413 -> 5e9192cdf2)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from d9c912f413 finish memory calculate for SeriesAggregationScanOperator
     new c1691b4205 finish memory calculate for SingleInputAggregationOperator
     new 5e9192cdf2 finish memory calculate for AggregationOperator

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../db/mpp/execution/operator/AggregationUtil.java |  2 +-
 .../operator/process/AggregationOperator.java      | 36 +++++------
 .../process/RawDataAggregationOperator.java        | 23 ++------
 .../process/SingleInputAggregationOperator.java    | 31 +++++++---
 .../process/SlidingWindowAggregationOperator.java  | 17 ++----
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 69 ++++++++++++++++++++--
 .../operator/AggregationOperatorTest.java          |  5 +-
 .../operator/RawDataAggregationOperatorTest.java   |  5 +-
 .../SlidingWindowAggregationOperatorTest.java      |  4 +-
 9 files changed, 125 insertions(+), 67 deletions(-)


[iotdb] 01/02: finish memory calculate for SingleInputAggregationOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c1691b42055fe6429821d10662aa215826e66c39
Author: liuminghui233 <54...@qq.com>
AuthorDate: Sun Aug 14 23:58:32 2022 +0800

    finish memory calculate for SingleInputAggregationOperator
---
 .../process/RawDataAggregationOperator.java        | 23 +++---------
 .../process/SingleInputAggregationOperator.java    | 31 ++++++++++++----
 .../process/SlidingWindowAggregationOperator.java  | 17 +++------
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 41 ++++++++++++++++++++--
 .../operator/RawDataAggregationOperatorTest.java   |  5 ++-
 .../SlidingWindowAggregationOperatorTest.java      |  4 ++-
 6 files changed, 78 insertions(+), 43 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index c88345c901..d95968850d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -20,17 +20,15 @@
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.List;
 
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
-import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 /**
  * RawDataAggregationOperator is used to process raw data tsBlock input calculating using value
@@ -43,13 +41,15 @@ import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEF
  * <p>Return aggregation result in many time intervals once.
  */
 public class RawDataAggregationOperator extends SingleInputAggregationOperator {
+
   public RawDataAggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       Operator child,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter) {
-    super(operatorContext, aggregators, child, ascending, groupByTimeParameter, true);
+      long maxReturnSize) {
+    super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize);
   }
 
   @Override
@@ -81,17 +81,4 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
     inputTsBlock = calcResult.getRight();
     return calcResult.getLeft();
   }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return calculateMaxReturnSize() + child.calculateMaxReturnSize();
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    // time + all value columns
-    return (1L + inputTsBlock.getValueColumnCount())
-            * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
-        + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index c59de13988..1d746f1150 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -37,7 +36,6 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 
 public abstract class SingleInputAggregationOperator implements ProcessOperator {
 
@@ -57,26 +55,30 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
   // using for building result tsBlock
   protected final TsBlockBuilder resultTsBlockBuilder;
 
+  protected final long maxRetainedSize;
+  protected final long maxReturnSize;
+
   public SingleInputAggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
       Operator child,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter,
-      boolean outputPartialTimeWindow) {
+      ITimeRangeIterator timeRangeIterator,
+      long maxReturnSize) {
     this.operatorContext = operatorContext;
     this.ascending = ascending;
     this.child = child;
     this.aggregators = aggregators;
-
-    this.timeRangeIterator =
-        initTimeRangeIterator(groupByTimeParameter, ascending, outputPartialTimeWindow);
+    this.timeRangeIterator = timeRangeIterator;
 
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+    this.maxRetainedSize = child.calculateMaxReturnSize();
+    this.maxReturnSize = maxReturnSize;
   }
 
   @Override
@@ -146,4 +148,19 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
     curTimeRange = null;
     appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator);
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxReturnSize + maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 9f1050772c..d73491d3e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -30,7 +30,6 @@ import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
-import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class SlidingWindowAggregationOperator extends SingleInputAggregationOperator {
 
@@ -41,10 +40,12 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
   public SlidingWindowAggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       Operator child,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter) {
-    super(operatorContext, aggregators, child, ascending, groupByTimeParameter, false);
+      GroupByTimeParameter groupByTimeParameter,
+      long maxReturnSize) {
+    super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize);
     checkArgument(
         groupByTimeParameter != null,
         "GroupByTimeParameter cannot be null in SlidingWindowAggregationOperator");
@@ -105,14 +106,4 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
     }
     curSubTimeRange = null;
   }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return calculateMaxReturnSize() + child.calculateMaxReturnSize();
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index cfb2b42a57..50d65c2bbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -188,6 +188,7 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSize;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSizeForLastQuery;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
@@ -1088,7 +1089,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     boolean ascending = node.getScanOrder() == Ordering.ASC;
     List<Aggregator> aggregators = new ArrayList<>();
     Map<String, List<InputLocation>> layout = makeLayout(node);
-    for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) {
+    List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
+    for (AggregationDescriptor descriptor : aggregationDescriptors) {
       List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
       aggregators.add(
           SlidingWindowAggregatorFactory.createSlidingWindowAggregator(
@@ -1102,9 +1104,25 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
               descriptor.getStep()));
     }
 
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(groupByTimeParameter, ascending, false);
+    long maxReturnSize =
+        calculateMaxAggregationResultSize(
+            aggregationDescriptors,
+            timeRangeIterator,
+            groupByTimeParameter != null,
+            context.getTypeProvider());
+
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
     return new SlidingWindowAggregationOperator(
-        operatorContext, aggregators, child, ascending, node.getGroupByTimeParameter());
+        operatorContext,
+        aggregators,
+        timeRangeIterator,
+        child,
+        ascending,
+        groupByTimeParameter,
+        maxReturnSize);
   }
 
   @Override
@@ -1149,6 +1167,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     boolean ascending = node.getScanOrder() == Ordering.ASC;
     List<Aggregator> aggregators = new ArrayList<>();
     Map<String, List<InputLocation>> layout = makeLayout(node);
+    List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
     for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) {
       List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
       aggregators.add(
@@ -1174,8 +1193,24 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                   node.getPlanNodeId(),
                   RawDataAggregationOperator.class.getSimpleName());
       context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
+
+      GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+      ITimeRangeIterator timeRangeIterator =
+          initTimeRangeIterator(groupByTimeParameter, ascending, true);
+      long maxReturnSize =
+          calculateMaxAggregationResultSize(
+              aggregationDescriptors,
+              timeRangeIterator,
+              groupByTimeParameter != null,
+              context.getTypeProvider());
+
       return new RawDataAggregationOperator(
-          operatorContext, aggregators, children.get(0), ascending, node.getGroupByTimeParameter());
+          operatorContext,
+          aggregators,
+          timeRangeIterator,
+          children.get(0),
+          ascending,
+          maxReturnSize);
     } else {
       OperatorContext operatorContext =
           context
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index 6a9902f24b..f25b0d2fcd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -64,6 +64,8 @@ import java.util.concurrent.ExecutorService;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 
 public class RawDataAggregationOperatorTest {
@@ -381,8 +383,9 @@ public class RawDataAggregationOperatorTest {
     return new RawDataAggregationOperator(
         fragmentInstanceContext.getOperatorContexts().get(3),
         aggregators,
+        initTimeRangeIterator(groupByTimeParameter, true, true),
         timeJoinOperator,
         true,
-        groupByTimeParameter);
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
index 7eebdedbcc..a0ed242e63 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
@@ -258,8 +258,10 @@ public class SlidingWindowAggregationOperatorTest {
     return new SlidingWindowAggregationOperator(
         fragmentInstanceContext.getOperatorContexts().get(1),
         finalAggregators,
+        initTimeRangeIterator(groupByTimeParameter, ascending, false),
         seriesAggregationScanOperator,
         ascending,
-        groupByTimeParameter);
+        groupByTimeParameter,
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
   }
 }


[iotdb] 02/02: finish memory calculate for AggregationOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5e9192cdf2041fe8db4af599dcf25973906abf24
Author: liuminghui233 <54...@qq.com>
AuthorDate: Mon Aug 15 00:16:00 2022 +0800

    finish memory calculate for AggregationOperator
---
 .../db/mpp/execution/operator/AggregationUtil.java |  2 +-
 .../operator/process/AggregationOperator.java      | 36 ++++++++++++----------
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 30 +++++++++++++++---
 .../operator/AggregationOperatorTest.java          |  5 ++-
 4 files changed, 48 insertions(+), 25 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
index 1896fc3af4..564f21245e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
@@ -177,7 +177,7 @@ public class AggregationUtil {
   }
 
   public static long calculateMaxAggregationResultSize(
-      List<AggregationDescriptor> aggregationDescriptors,
+      List<? extends AggregationDescriptor> aggregationDescriptors,
       ITimeRangeIterator timeRangeIterator,
       boolean isGroupByQuery,
       TypeProvider typeProvider) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 92da980d1e..7efe992cdd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -37,8 +36,6 @@ import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.successfulAsList;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
-import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 /**
  * AggregationOperator can process the situation: aggregation of intermediate aggregate result, it
@@ -63,16 +60,20 @@ public class AggregationOperator implements ProcessOperator {
   // using for building result tsBlock
   private final TsBlockBuilder resultTsBlockBuilder;
 
+  private final long maxRetainedSize;
+  private final long childrenRetainedSize;
+  private final long maxReturnSize;
+
   public AggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       List<Operator> children,
-      boolean ascending,
-      GroupByTimeParameter groupByTimeParameter,
-      boolean outputPartialTimeWindow) {
+      long maxReturnSize) {
     this.operatorContext = operatorContext;
     this.children = children;
     this.aggregators = aggregators;
+    this.timeRangeIterator = timeRangeIterator;
 
     this.inputOperatorsCount = children.size();
     this.inputTsBlocks = new TsBlock[inputOperatorsCount];
@@ -81,14 +82,16 @@ public class AggregationOperator implements ProcessOperator {
       canCallNext[i] = false;
     }
 
-    this.timeRangeIterator =
-        initTimeRangeIterator(groupByTimeParameter, ascending, outputPartialTimeWindow);
-
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+    this.maxRetainedSize = children.stream().mapToLong(Operator::calculateMaxReturnSize).sum();
+    this.childrenRetainedSize =
+        children.stream().mapToLong(Operator::calculateRetainedSizeAfterCallingNext).sum();
+    this.maxReturnSize = maxReturnSize;
   }
 
   @Override
@@ -98,18 +101,17 @@ public class AggregationOperator implements ProcessOperator {
 
   @Override
   public long calculateMaxPeekMemory() {
-    long maxPeekMemory = calculateMaxReturnSize();
-    long childrenMaxPeekMemory = 0;
-    for (Operator child : children) {
-      maxPeekMemory += child.calculateMaxReturnSize();
-      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
-    }
-    return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+    return maxReturnSize + maxRetainedSize + childrenRetainedSize;
   }
 
   @Override
   public long calculateMaxReturnSize() {
-    return (1L + inputOperatorsCount) * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return maxRetainedSize + childrenRetainedSize;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 50d65c2bbb..0978a006f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1045,7 +1045,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     boolean ascending = node.getScanOrder() == Ordering.ASC;
     List<Aggregator> aggregators = new ArrayList<>();
     Map<String, List<InputLocation>> layout = makeLayout(node);
-    for (GroupByLevelDescriptor descriptor : node.getGroupByLevelDescriptors()) {
+    List<GroupByLevelDescriptor> aggregationDescriptors = node.getGroupByLevelDescriptors();
+    for (GroupByLevelDescriptor descriptor : aggregationDescriptors) {
       List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
       TSDataType seriesDataType =
           context
@@ -1067,9 +1068,19 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                 node.getPlanNodeId(),
                 AggregationOperator.class.getSimpleName());
 
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(groupByTimeParameter, ascending, false);
+    long maxReturnSize =
+        calculateMaxAggregationResultSize(
+            aggregationDescriptors,
+            timeRangeIterator,
+            groupByTimeParameter != null,
+            context.getTypeProvider());
+
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
     return new AggregationOperator(
-        operatorContext, aggregators, children, ascending, node.getGroupByTimeParameter(), false);
+        operatorContext, aggregators, timeRangeIterator, children, maxReturnSize);
   }
 
   @Override
@@ -1183,6 +1194,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
               inputLocationList));
     }
     boolean inputRaw = node.getAggregationDescriptorList().get(0).getStep().isInputRaw();
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+
     if (inputRaw) {
       checkArgument(children.size() == 1, "rawDataAggregateOperator can only accept one input");
       OperatorContext operatorContext =
@@ -1194,7 +1207,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                   RawDataAggregationOperator.class.getSimpleName());
       context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
 
-      GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
       ITimeRangeIterator timeRangeIterator =
           initTimeRangeIterator(groupByTimeParameter, ascending, true);
       long maxReturnSize =
@@ -1219,9 +1231,19 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                   context.getNextOperatorId(),
                   node.getPlanNodeId(),
                   AggregationOperator.class.getSimpleName());
+
+      ITimeRangeIterator timeRangeIterator =
+          initTimeRangeIterator(groupByTimeParameter, ascending, true);
+      long maxReturnSize =
+          calculateMaxAggregationResultSize(
+              aggregationDescriptors,
+              timeRangeIterator,
+              groupByTimeParameter != null,
+              context.getTypeProvider());
+
       context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
       return new AggregationOperator(
-          operatorContext, aggregators, children, ascending, node.getGroupByTimeParameter(), true);
+          operatorContext, aggregators, timeRangeIterator, children, maxReturnSize);
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
index b3e80f1b82..85e65849a6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
@@ -374,9 +374,8 @@ public class AggregationOperatorTest {
     return new AggregationOperator(
         fragmentInstanceContext.getOperatorContexts().get(2),
         finalAggregators,
+        initTimeRangeIterator(groupByTimeParameter, true, true),
         children,
-        true,
-        groupByTimeParameter,
-        true);
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
   }
 }