You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/10 13:03:32 UTC

[iotdb] branch MemoryControl updated: implement calculateRetainedSizeAfterCallingNext in some operators

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/MemoryControl by this push:
     new 30ff39724d implement calculateRetainedSizeAfterCallingNext in some operators
30ff39724d is described below

commit 30ff39724d7c607a54bd74cb20b6b69a31ebef2e
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Aug 10 21:03:23 2022 +0800

    implement calculateRetainedSizeAfterCallingNext in some operators
---
 .../iotdb/db/mpp/execution/operator/Operator.java  |  7 +-
 .../execution/operator/process/FillOperator.java   |  8 ++-
 .../execution/operator/process/LimitOperator.java  |  5 ++
 .../operator/process/LinearFillOperator.java       |  8 ++-
 .../execution/operator/process/OffsetOperator.java |  5 ++
 .../execution/operator/process/SortOperator.java   |  5 ++
 .../process/join/RowBasedTimeJoinOperator.java     | 22 +++++-
 .../operator/process/join/TimeJoinOperator.java    | 22 +++++-
 .../process/last/LastQueryCollectOperator.java     | 10 +++
 .../process/last/LastQueryMergeOperator.java       | 31 ++++++--
 .../operator/process/last/LastQueryOperator.java   | 14 +++-
 .../process/last/LastQuerySortOperator.java        | 19 +++--
 .../process/last/UpdateLastCacheOperator.java      |  5 ++
 .../operator/source/AlignedSeriesScanOperator.java |  5 ++
 .../operator/source/ExchangeOperator.java          |  5 ++
 .../operator/source/LastCacheScanOperator.java     |  5 ++
 .../operator/source/SeriesScanOperator.java        |  5 ++
 .../mpp/execution/operator/OperatorMemoryTest.java | 84 +++++++++++++++++-----
 18 files changed, 221 insertions(+), 44 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
index 5ee15999b1..79ed840ebc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
@@ -59,7 +59,8 @@ public interface Operator extends AutoCloseable {
    * be like: long estimatedOfCurrentOperator = XXXXX; return max(estimatedOfCurrentOperator,
    * child1.calculateMaxPeekMemory(), child2.calculateMaxPeekMemory(), ....)
    *
-   * Each operator's MaxPeekMemory should also take retained size of each child operator into account.
+   * <p>Each operator's MaxPeekMemory should also take retained size of each child operator into
+   * account.
    *
    * @return estimated max memory footprint that the Operator Tree(rooted from this operator) will
    *     use while doing its own query processing
@@ -75,9 +76,9 @@ public interface Operator extends AutoCloseable {
   }
 
   // TODO remove the default while completing all the operators
-
   /**
-   * @return each operator's retained size after calling its next() method
+   * @return each operator's retained size(including all its children's retained size) after calling
+   *     its next() method
    */
   default long calculateRetainedSizeAfterCallingNext() {
     return 0L;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index 94fa08fcd9..c182168bd3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -97,7 +97,13 @@ public class FillOperator implements ProcessOperator {
     // while doing constant and previous fill, we may need to copy the corresponding column if there
     // exists null values
     // so the max peek memory may be double
-    return 2 * child.calculateMaxPeekMemory();
+    return 2 * child.calculateMaxPeekMemory() + child.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    // we can safely ignore one line cached in IFill
+    return child.calculateRetainedSizeAfterCallingNext();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index 4c5d608902..726f5e7ecf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -90,4 +90,9 @@ public class LimitOperator implements ProcessOperator {
   public long calculateMaxReturnSize() {
     return child.calculateMaxReturnSize();
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return child.calculateRetainedSizeAfterCallingNext();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index ee69557074..bcbb92a932 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -166,7 +166,7 @@ public class LinearFillOperator implements ProcessOperator {
     // values, and we may also need to cache next TsBlock to get next not null value
     // so the max peek memory may be triple or more, here we just use 3 as the estimated factor
     // because in most cases, we will get next not null value in next TsBlock
-    return 3 * child.calculateMaxPeekMemory();
+    return 3 * child.calculateMaxPeekMemory() + child.calculateRetainedSizeAfterCallingNext();
   }
 
   @Override
@@ -174,6 +174,12 @@ public class LinearFillOperator implements ProcessOperator {
     return child.calculateMaxReturnSize();
   }
 
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    // we can safely ignore two lines cached in LinearFill
+    return child.calculateRetainedSizeAfterCallingNext();
+  }
+
   /**
    * Judge whether we can use current cached TsBlock to fill Column
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index a329b25346..572738d081 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -89,4 +89,9 @@ public class OffsetOperator implements ProcessOperator {
   public long calculateMaxReturnSize() {
     return child.calculateMaxReturnSize();
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return child.calculateRetainedSizeAfterCallingNext();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
index a7cd8d046a..5fa693b2a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
@@ -64,4 +64,9 @@ public class SortOperator implements ProcessOperator {
   public long calculateMaxReturnSize() {
     return 0;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 386f840d6c..695893d400 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -247,12 +247,16 @@ public class RowBasedTimeJoinOperator implements ProcessOperator {
 
   @Override
   public long calculateMaxPeekMemory() {
-    long maxPeekMemory = calculateMaxReturnSize();
+    long maxPeekMemory = 0;
     long childrenMaxPeekMemory = 0;
     for (Operator child : children) {
-      maxPeekMemory += child.calculateMaxReturnSize();
-      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+      childrenMaxPeekMemory =
+          Math.max(childrenMaxPeekMemory, maxPeekMemory + child.calculateMaxPeekMemory());
+      maxPeekMemory +=
+          (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
     }
+
+    maxPeekMemory += calculateMaxReturnSize();
     return Math.max(maxPeekMemory, childrenMaxPeekMemory);
   }
 
@@ -263,6 +267,18 @@ public class RowBasedTimeJoinOperator implements ProcessOperator {
         * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
   }
 
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+    for (Operator child : children) {
+      long maxReturnSize = child.calculateMaxReturnSize();
+      currentRetainedSize += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+      minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+    }
+    // max cached TsBlock
+    return currentRetainedSize - minChildReturnSize;
+  }
+
   private void updateTimeSelector(int index) {
     timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index]));
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
index bd5e005acd..3170838986 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
@@ -246,12 +246,16 @@ public class TimeJoinOperator implements ProcessOperator {
 
   @Override
   public long calculateMaxPeekMemory() {
-    long maxPeekMemory = calculateMaxReturnSize();
+    long maxPeekMemory = 0;
     long childrenMaxPeekMemory = 0;
     for (Operator child : children) {
-      maxPeekMemory += child.calculateMaxReturnSize();
-      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+      childrenMaxPeekMemory =
+          Math.max(childrenMaxPeekMemory, maxPeekMemory + child.calculateMaxPeekMemory());
+      maxPeekMemory +=
+          (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
     }
+
+    maxPeekMemory += calculateMaxReturnSize();
     return Math.max(maxPeekMemory, childrenMaxPeekMemory);
   }
 
@@ -262,6 +266,18 @@ public class TimeJoinOperator implements ProcessOperator {
         * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
   }
 
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+    for (Operator child : children) {
+      long maxReturnSize = child.calculateMaxReturnSize();
+      currentRetainedSize += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+      minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+    }
+    // max cached TsBlock
+    return currentRetainedSize - minChildReturnSize;
+  }
+
   /**
    * If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
    * return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
index f679bd1769..b91e346372 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
@@ -91,6 +91,7 @@ public class LastQueryCollectOperator implements ProcessOperator {
     long maxPeekMemory = 0;
     for (Operator child : children) {
       maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+      maxPeekMemory = Math.max(maxPeekMemory, child.calculateRetainedSizeAfterCallingNext());
     }
     return maxPeekMemory;
   }
@@ -103,4 +104,13 @@ public class LastQueryCollectOperator implements ProcessOperator {
     }
     return maxReturnMemory;
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long sum = 0;
+    for (Operator operator : children) {
+      sum += operator.calculateRetainedSizeAfterCallingNext();
+    }
+    return sum;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index af6e7b4bf7..5ad04bdc2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -225,16 +225,19 @@ public class LastQueryMergeOperator implements ProcessOperator {
 
   @Override
   public long calculateMaxPeekMemory() {
-    // result size + cached TreeMap size
-    long maxPeekMemory =
-        calculateMaxReturnSize()
-            + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
-                * MAP_NODE_RETRAINED_SIZE;
+    long maxPeekMemory = 0;
     long childrenMaxPeekMemory = 0;
     for (Operator child : children) {
-      maxPeekMemory += child.calculateMaxReturnSize();
-      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+      childrenMaxPeekMemory =
+          Math.max(childrenMaxPeekMemory, maxPeekMemory + child.calculateMaxPeekMemory());
+      maxPeekMemory +=
+          (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
     }
+    // result size + cached TreeMap size
+    maxPeekMemory +=
+        (calculateMaxReturnSize()
+            + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+                * MAP_NODE_RETRAINED_SIZE);
     return Math.max(maxPeekMemory, childrenMaxPeekMemory);
   }
 
@@ -247,6 +250,20 @@ public class LastQueryMergeOperator implements ProcessOperator {
     return maxReturnSize;
   }
 
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long childrenSum = 0, minChildReturnSize = Long.MAX_VALUE;
+    for (Operator child : children) {
+      long maxReturnSize = child.calculateMaxReturnSize();
+      childrenSum += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+      minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+    }
+    // max cached TsBlock + cached TreeMap size
+    return (childrenSum - minChildReturnSize)
+        + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+            * MAP_NODE_RETRAINED_SIZE;
+  }
+
   /**
    * If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
    * return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index 40e434ef54..60620b1f56 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -146,14 +146,24 @@ public class LastQueryOperator implements ProcessOperator {
   public long calculateMaxPeekMemory() {
     long maxPeekMemory =
         Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, tsBlockBuilder.getRetainedSizeInBytes());
+    long res = 0;
     for (Operator child : children) {
-      maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+      res = Math.max(res, maxPeekMemory + child.calculateMaxPeekMemory());
     }
-    return maxPeekMemory;
+    return res;
   }
 
   @Override
   public long calculateMaxReturnSize() {
     return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, tsBlockBuilder.getRetainedSizeInBytes());
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long sum = 0;
+    for (Operator operator : children) {
+      sum += operator.calculateRetainedSizeAfterCallingNext();
+    }
+    return sum;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index e40da67999..082173c29f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -194,13 +194,11 @@ public class LastQuerySortOperator implements ProcessOperator {
   @Override
   public long calculateMaxPeekMemory() {
     long maxPeekMemory = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + cachedTsBlock.getRetainedSizeInBytes();
-    long maxChildrenReturnSize = 0;
-    long maxChildrenPeekMemory = 0;
+    long res = 0;
     for (Operator child : children) {
-      maxChildrenReturnSize = Math.max(maxChildrenReturnSize, child.calculateMaxReturnSize());
-      maxChildrenPeekMemory = Math.max(maxChildrenPeekMemory, child.calculateMaxPeekMemory());
+      res = Math.max(res, maxPeekMemory + child.calculateMaxPeekMemory());
     }
-    return Math.max(maxPeekMemory + maxChildrenReturnSize, maxChildrenPeekMemory);
+    return res;
   }
 
   @Override
@@ -208,6 +206,17 @@ public class LastQuerySortOperator implements ProcessOperator {
     return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
 
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long childrenMaxReturnSize = 0;
+    long childrenSumRetainedSize = 0;
+    for (Operator child : children) {
+      childrenMaxReturnSize = Math.max(childrenMaxReturnSize, child.calculateMaxReturnSize());
+      childrenSumRetainedSize += child.calculateRetainedSizeAfterCallingNext();
+    }
+    return cachedTsBlock.getRetainedSizeInBytes() + childrenMaxReturnSize + childrenSumRetainedSize;
+  }
+
   private int getEndIndex() {
     return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index afd543b8dc..44e423424e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -141,4 +141,9 @@ public class UpdateLastCacheOperator implements ProcessOperator {
   public long calculateMaxReturnSize() {
     return child.calculateMaxReturnSize();
   }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return child.calculateRetainedSizeAfterCallingNext();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
index 5b4f07cf24..c64947e4b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -133,6 +133,11 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
     return maxReturnSize;
   }
 
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
+
   private boolean readChunkData() throws IOException {
     while (seriesScanUtil.hasNextChunk()) {
       if (readPageData()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index d9202de3f2..110079809b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -74,6 +74,11 @@ public class ExchangeOperator implements SourceOperator {
     return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
 
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
+
   @Override
   public PlanNodeId getSourceId() {
     return sourceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
index 6e5ff45cfa..974758f8a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
@@ -67,6 +67,11 @@ public class LastCacheScanOperator implements SourceOperator {
     return tsBlock.getRetainedSizeInBytes();
   }
 
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
+
   @Override
   public PlanNodeId getSourceId() {
     return sourceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index 5a58594940..05685f758d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -134,6 +134,11 @@ public class SeriesScanOperator implements DataSourceOperator {
     return maxReturnSize;
   }
 
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
+
   private boolean readChunkData() throws IOException {
     while (seriesScanUtil.hasNextChunk()) {
       if (readPageData()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index b57ee14dcf..349e98758b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -108,6 +108,8 @@ public class OperatorMemoryTest {
       assertEquals(
           TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
           seriesScanOperator.calculateMaxReturnSize());
+      assertEquals(0, seriesScanOperator.calculateRetainedSizeAfterCallingNext());
+
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
@@ -152,6 +154,8 @@ public class OperatorMemoryTest {
           4 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
           seriesScanOperator.calculateMaxReturnSize());
 
+      assertEquals(0, seriesScanOperator.calculateRetainedSizeAfterCallingNext());
+
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
@@ -166,6 +170,7 @@ public class OperatorMemoryTest {
 
     assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, exchangeOperator.calculateMaxPeekMemory());
     assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, exchangeOperator.calculateMaxReturnSize());
+    assertEquals(0, exchangeOperator.calculateRetainedSizeAfterCallingNext());
   }
 
   @Test
@@ -176,6 +181,7 @@ public class OperatorMemoryTest {
 
     assertEquals(1024, lastCacheScanOperator.calculateMaxPeekMemory());
     assertEquals(1024, lastCacheScanOperator.calculateMaxReturnSize());
+    assertEquals(0, lastCacheScanOperator.calculateRetainedSizeAfterCallingNext());
   }
 
   @Test
@@ -183,12 +189,14 @@ public class OperatorMemoryTest {
     Operator child = Mockito.mock(Operator.class);
     Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
     Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
 
     FillOperator fillOperator =
         new FillOperator(Mockito.mock(OperatorContext.class), new IFill[] {null, null}, child);
 
-    assertEquals(2048 * 2, fillOperator.calculateMaxPeekMemory());
+    assertEquals(2048 * 2 + 512, fillOperator.calculateMaxPeekMemory());
     assertEquals(1024, fillOperator.calculateMaxReturnSize());
+    assertEquals(512, fillOperator.calculateRetainedSizeAfterCallingNext());
   }
 
   @Test
@@ -203,6 +211,7 @@ public class OperatorMemoryTest {
       long currentMaxReturnSize = random.nextInt(1024);
       Mockito.when(child.calculateMaxPeekMemory()).thenReturn(currentMaxPeekMemory);
       Mockito.when(child.calculateMaxReturnSize()).thenReturn(currentMaxReturnSize);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
       children.add(child);
       expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, currentMaxPeekMemory);
       expectedMaxReturnSize = Math.max(expectedMaxReturnSize, currentMaxReturnSize);
@@ -212,6 +221,7 @@ public class OperatorMemoryTest {
 
     assertEquals(expectedMaxPeekMemory, lastQueryCollectOperator.calculateMaxPeekMemory());
     assertEquals(expectedMaxReturnSize, lastQueryCollectOperator.calculateMaxReturnSize());
+    assertEquals(4 * 512, lastQueryCollectOperator.calculateRetainedSizeAfterCallingNext());
   }
 
   @Test
@@ -219,18 +229,24 @@ public class OperatorMemoryTest {
     List<Operator> children = new ArrayList<>(4);
     Random random = new Random();
     long expectedMaxPeekMemory = 0;
+    long temp = 0;
     long expectedMaxReturnSize = 0;
     long childSumReturnSize = 0;
+    long minReturnSize = Long.MAX_VALUE;
     for (int i = 0; i < 4; i++) {
       Operator child = Mockito.mock(Operator.class);
       long currentMaxPeekMemory = random.nextInt(1024) + 1024;
       long currentMaxReturnSize = random.nextInt(1024);
+      minReturnSize = Math.min(minReturnSize, currentMaxReturnSize);
       childSumReturnSize += currentMaxReturnSize;
       Mockito.when(child.calculateMaxPeekMemory()).thenReturn(currentMaxPeekMemory);
       Mockito.when(child.calculateMaxReturnSize()).thenReturn(currentMaxReturnSize);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
       children.add(child);
       expectedMaxReturnSize = Math.max(expectedMaxReturnSize, currentMaxReturnSize);
-      expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, currentMaxPeekMemory);
+      expectedMaxPeekMemory =
+          Math.max(expectedMaxPeekMemory, temp + child.calculateMaxPeekMemory());
+      temp += (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
     }
     // we need to cache all the TsBlocks of children and then return a new TsBlock as result whose
     // max possible should be equal to max return size among all its children and then we should
@@ -238,7 +254,7 @@ public class OperatorMemoryTest {
     expectedMaxPeekMemory =
         Math.max(
             expectedMaxPeekMemory,
-            childSumReturnSize
+            temp
                 + expectedMaxReturnSize
                 + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
                     * MAP_NODE_RETRAINED_SIZE);
@@ -249,6 +265,13 @@ public class OperatorMemoryTest {
 
     assertEquals(expectedMaxPeekMemory, lastQueryMergeOperator.calculateMaxPeekMemory());
     assertEquals(expectedMaxReturnSize, lastQueryMergeOperator.calculateMaxReturnSize());
+    assertEquals(
+        childSumReturnSize
+            - minReturnSize
+            + 4 * 512
+            + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+                * MAP_NODE_RETRAINED_SIZE,
+        lastQueryMergeOperator.calculateRetainedSizeAfterCallingNext());
   }
 
   @Test
@@ -256,25 +279,28 @@ public class OperatorMemoryTest {
     TsBlockBuilder builder = Mockito.mock(TsBlockBuilder.class);
     Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(1024L);
     List<UpdateLastCacheOperator> children = new ArrayList<>(4);
-    long expectedMaxPeekMemory = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
     long expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
     for (int i = 0; i < 4; i++) {
       UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);
       Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024 * 1024L);
       Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
       children.add(child);
-      expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, 2 * 1024 * 1024L);
       expectedMaxReturnSize = Math.max(expectedMaxReturnSize, 1024L);
     }
     LastQueryOperator lastQueryOperator =
         new LastQueryOperator(Mockito.mock(OperatorContext.class), children, builder);
 
-    assertEquals(expectedMaxPeekMemory, lastQueryOperator.calculateMaxPeekMemory());
+    assertEquals(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + 2 * 1024 * 1024L,
+        lastQueryOperator.calculateMaxPeekMemory());
     assertEquals(expectedMaxReturnSize, lastQueryOperator.calculateMaxReturnSize());
+    assertEquals(4 * 512L, lastQueryOperator.calculateRetainedSizeAfterCallingNext());
 
     Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(4 * 1024 * 1024L);
-    assertEquals(4 * 1024 * 1024L, lastQueryOperator.calculateMaxPeekMemory());
+    assertEquals(4 * 1024 * 1024L + 2 * 1024 * 1024L, lastQueryOperator.calculateMaxPeekMemory());
     assertEquals(4 * 1024 * 1024L, lastQueryOperator.calculateMaxReturnSize());
+    assertEquals(4 * 512L, lastQueryOperator.calculateRetainedSizeAfterCallingNext());
   }
 
   @Test
@@ -283,23 +309,26 @@ public class OperatorMemoryTest {
     Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(16 * 1024L);
     Mockito.when(tsBlock.getPositionCount()).thenReturn(16);
     List<UpdateLastCacheOperator> children = new ArrayList<>(4);
-    long expectedMaxPeekMemory =
-        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + tsBlock.getRetainedSizeInBytes();
+
     for (int i = 0; i < 4; i++) {
       UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);
       Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
       Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
       children.add(child);
     }
 
-    expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory + 1024, 2 * 1024);
-
     LastQuerySortOperator lastQuerySortOperator =
         new LastQuerySortOperator(
             Mockito.mock(OperatorContext.class), tsBlock, children, Comparator.naturalOrder());
 
-    assertEquals(expectedMaxPeekMemory, lastQuerySortOperator.calculateMaxPeekMemory());
+    assertEquals(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + tsBlock.getRetainedSizeInBytes() + 2 * 1024L,
+        lastQuerySortOperator.calculateMaxPeekMemory());
     assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, lastQuerySortOperator.calculateMaxReturnSize());
+    assertEquals(
+        16 * 1024L + 1024L + 4 * 512L,
+        lastQuerySortOperator.calculateRetainedSizeAfterCallingNext());
   }
 
   @Test
@@ -307,12 +336,14 @@ public class OperatorMemoryTest {
     Operator child = Mockito.mock(Operator.class);
     Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
     Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
 
     LimitOperator limitOperator =
         new LimitOperator(Mockito.mock(OperatorContext.class), 100, child);
 
     assertEquals(2 * 1024L, limitOperator.calculateMaxPeekMemory());
     assertEquals(1024, limitOperator.calculateMaxReturnSize());
+    assertEquals(512, limitOperator.calculateRetainedSizeAfterCallingNext());
   }
 
   @Test
@@ -320,12 +351,14 @@ public class OperatorMemoryTest {
     Operator child = Mockito.mock(Operator.class);
     Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
     Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
 
     OffsetOperator offsetOperator =
         new OffsetOperator(Mockito.mock(OperatorContext.class), 100, child);
 
     assertEquals(2 * 1024L, offsetOperator.calculateMaxPeekMemory());
     assertEquals(1024, offsetOperator.calculateMaxReturnSize());
+    assertEquals(512, offsetOperator.calculateRetainedSizeAfterCallingNext());
   }
 
   @Test
@@ -336,19 +369,22 @@ public class OperatorMemoryTest {
     dataTypeList.add(TSDataType.INT32);
     long expectedMaxReturnSize =
         3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
-    long expectedMaxPeekMemory = expectedMaxReturnSize;
+    long expectedMaxPeekMemory = 0;
     long childrenMaxPeekMemory = 0;
 
     for (int i = 0; i < 4; i++) {
       Operator child = Mockito.mock(Operator.class);
       Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
       Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      childrenMaxPeekMemory =
+          Math.max(childrenMaxPeekMemory, expectedMaxPeekMemory + child.calculateMaxPeekMemory());
       expectedMaxPeekMemory += 64 * 1024L;
-      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
       children.add(child);
     }
 
-    expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+    expectedMaxPeekMemory =
+        Math.max(expectedMaxPeekMemory + expectedMaxReturnSize, childrenMaxPeekMemory);
 
     RowBasedTimeJoinOperator rowBasedTimeJoinOperator =
         new RowBasedTimeJoinOperator(
@@ -356,6 +392,7 @@ public class OperatorMemoryTest {
 
     assertEquals(expectedMaxPeekMemory, rowBasedTimeJoinOperator.calculateMaxPeekMemory());
     assertEquals(expectedMaxReturnSize, rowBasedTimeJoinOperator.calculateMaxReturnSize());
+    assertEquals(3 * 64 * 1024L, rowBasedTimeJoinOperator.calculateRetainedSizeAfterCallingNext());
   }
 
   @Test
@@ -363,6 +400,7 @@ public class OperatorMemoryTest {
     SortOperator sortOperator = new SortOperator();
     assertEquals(0, sortOperator.calculateMaxPeekMemory());
     assertEquals(0, sortOperator.calculateMaxReturnSize());
+    assertEquals(0, sortOperator.calculateRetainedSizeAfterCallingNext());
   }
 
   @Test
@@ -373,19 +411,22 @@ public class OperatorMemoryTest {
     dataTypeList.add(TSDataType.INT32);
     long expectedMaxReturnSize =
         3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
-    long expectedMaxPeekMemory = expectedMaxReturnSize;
+    long expectedMaxPeekMemory = 0;
     long childrenMaxPeekMemory = 0;
 
     for (int i = 0; i < 4; i++) {
       Operator child = Mockito.mock(Operator.class);
       Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
       Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      childrenMaxPeekMemory =
+          Math.max(childrenMaxPeekMemory, expectedMaxPeekMemory + child.calculateMaxPeekMemory());
       expectedMaxPeekMemory += 64 * 1024L;
-      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
       children.add(child);
     }
 
-    expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+    expectedMaxPeekMemory =
+        Math.max(expectedMaxPeekMemory + expectedMaxReturnSize, childrenMaxPeekMemory);
 
     TimeJoinOperator timeJoinOperator =
         new TimeJoinOperator(
@@ -393,6 +434,7 @@ public class OperatorMemoryTest {
 
     assertEquals(expectedMaxPeekMemory, timeJoinOperator.calculateMaxPeekMemory());
     assertEquals(expectedMaxReturnSize, timeJoinOperator.calculateMaxReturnSize());
+    assertEquals(3 * 64 * 1024L, timeJoinOperator.calculateRetainedSizeAfterCallingNext());
   }
 
   @Test
@@ -400,12 +442,14 @@ public class OperatorMemoryTest {
     Operator child = Mockito.mock(Operator.class);
     Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
     Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
 
     UpdateLastCacheOperator updateLastCacheOperator =
         new UpdateLastCacheOperator(null, child, null, TSDataType.BOOLEAN, null, true);
 
     assertEquals(2048, updateLastCacheOperator.calculateMaxPeekMemory());
     assertEquals(1024, updateLastCacheOperator.calculateMaxReturnSize());
+    assertEquals(512, updateLastCacheOperator.calculateRetainedSizeAfterCallingNext());
   }
 
   @Test
@@ -413,12 +457,14 @@ public class OperatorMemoryTest {
     Operator child = Mockito.mock(Operator.class);
     Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
     Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
 
     LinearFillOperator linearFillOperator =
         new LinearFillOperator(
             Mockito.mock(OperatorContext.class), new LinearFill[] {null, null}, child);
 
-    assertEquals(2048 * 3, linearFillOperator.calculateMaxPeekMemory());
+    assertEquals(2048 * 3 + 512L, linearFillOperator.calculateMaxPeekMemory());
     assertEquals(1024, linearFillOperator.calculateMaxReturnSize());
+    assertEquals(512, linearFillOperator.calculateRetainedSizeAfterCallingNext());
   }
 }