You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/08 12:56:15 UTC

[iotdb] 01/01: Add memory control for some operators

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 93cea7db3125ccb0c865f9433856158f3f883e2c
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Aug 8 20:55:30 2022 +0800

    Add memory control for some operators
---
 .../resources/conf/iotdb-datanode.properties       |  6 +--
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 46 ++++++++++++++++------
 .../mpp/execution/memory/LocalMemoryManager.java   |  8 ++--
 .../iotdb/db/mpp/execution/operator/Operator.java  | 19 +++++++++
 .../execution/operator/process/FillOperator.java   | 13 ++++++
 .../execution/operator/process/LimitOperator.java  | 10 +++++
 .../operator/process/LinearFillOperator.java       | 14 +++++++
 .../execution/operator/process/OffsetOperator.java | 10 +++++
 .../execution/operator/process/SortOperator.java   | 10 +++++
 .../process/join/RowBasedTimeJoinOperator.java     | 19 +++++++++
 .../operator/process/join/TimeJoinOperator.java    | 19 +++++++++
 .../process/last/LastQueryCollectOperator.java     | 18 +++++++++
 .../process/last/LastQueryMergeOperator.java       | 30 ++++++++++++++
 .../operator/process/last/LastQueryOperator.java   | 16 ++++++++
 .../process/last/LastQuerySortOperator.java        | 17 ++++++++
 .../process/last/UpdateLastCacheOperator.java      | 10 +++++
 .../operator/source/AlignedSeriesScanOperator.java | 21 +++++++++-
 .../operator/source/ExchangeOperator.java          | 12 ++++++
 .../operator/source/LastCacheScanOperator.java     | 10 +++++
 .../operator/source/SeriesScanOperator.java        | 18 ++++++++-
 .../execution/operator/source/SeriesScanUtil.java  |  4 +-
 21 files changed, 307 insertions(+), 23 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 57448902cd..35a0f89d14 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -637,9 +637,9 @@ timestamp_precision=ms
 # Datatype: boolean
 # meta_data_cache_enable=true
 
-# Read memory Allocation Ratio: BloomFilterCache, ChunkCache, TimeSeriesMetadataCache, memory used for constructing QueryDataSet and Free Memory Used in Query.
-# The parameter form is a:b:c:d:e, where a, b, c, d and e are integers. for example: 1:1:1:1:1 , 1:100:200:300:400
-# chunk_timeseriesmeta_free_memory_proportion=1:100:200:300:400
+# Read memory Allocation Ratio: BloomFilterCache : ChunkCache : TimeSeriesMetadataCache : Coordinator : Operators : DataExchange.
+# The parameter form is a:b:c:d:e:f, where a, b, c, d, e and f are integers. for example: 1:1:1:1:1:1 , 1:100:200:50:300:350
+# chunk_timeseriesmeta_free_memory_proportion=1:100:200:50:300:350
 
 ####################
 ### LAST Cache Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index cb2583e4d9..17234c37be 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -130,10 +130,7 @@ public class IoTDBConfig {
   private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10;
 
   /** Memory allocated for the mtree */
-  private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() * 1 / 10;
-
-  /** Memory allocated for the read process besides cache */
-  private long allocateMemoryForReadWithoutCache = allocateMemoryForRead * 300 / 1001;
+  private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() / 10;
 
   private volatile int maxQueryDeduplicatedPathNum = 1000;
 
@@ -474,6 +471,15 @@ public class IoTDBConfig {
   /** Memory allocated for chunk cache in read process */
   private long allocateMemoryForChunkCache = allocateMemoryForRead * 100 / 1001;
 
+  /** Memory allocated for operators */
+  private long allocateMemoryForCoordinator = allocateMemoryForRead * 50 / 1001;
+
+  /** Memory allocated for operators */
+  private long allocateMemoryForOperators = allocateMemoryForRead * 300 / 1001;
+
+  /** Memory allocated for operators */
+  private long allocateMemoryForDataExchange = allocateMemoryForRead * 350 / 1001;
+
   /** Whether to enable Last cache */
   private boolean lastCacheEnable = true;
 
@@ -1715,14 +1721,6 @@ public class IoTDBConfig {
     this.allocateMemoryForRead = allocateMemoryForRead;
   }
 
-  public long getAllocateMemoryForReadWithoutCache() {
-    return allocateMemoryForReadWithoutCache;
-  }
-
-  public void setAllocateMemoryForReadWithoutCache(long allocateMemoryForReadWithoutCache) {
-    this.allocateMemoryForReadWithoutCache = allocateMemoryForReadWithoutCache;
-  }
-
   public boolean isEnableExternalSort() {
     return enableExternalSort;
   }
@@ -1934,6 +1932,30 @@ public class IoTDBConfig {
     this.allocateMemoryForChunkCache = allocateMemoryForChunkCache;
   }
 
+  public long getAllocateMemoryForCoordinator() {
+    return allocateMemoryForCoordinator;
+  }
+
+  public void setAllocateMemoryForCoordinator(long allocateMemoryForCoordinator) {
+    this.allocateMemoryForCoordinator = allocateMemoryForCoordinator;
+  }
+
+  public long getAllocateMemoryForOperators() {
+    return allocateMemoryForOperators;
+  }
+
+  public void setAllocateMemoryForOperators(long allocateMemoryForOperators) {
+    this.allocateMemoryForOperators = allocateMemoryForOperators;
+  }
+
+  public long getAllocateMemoryForDataExchange() {
+    return allocateMemoryForDataExchange;
+  }
+
+  public void setAllocateMemoryForDataExchange(long allocateMemoryForDataExchange) {
+    this.allocateMemoryForDataExchange = allocateMemoryForDataExchange;
+  }
+
   public boolean isLastCacheEnabled() {
     return lastCacheEnable;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
index d0eff60394..5c6f3c5659 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
@@ -30,11 +30,9 @@ public class LocalMemoryManager {
   private final MemoryPool queryPool;
 
   public LocalMemoryManager() {
-    queryPool =
-        new MemoryPool(
-            "query",
-            IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead(),
-            (long) (IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead() * 0.5));
+    long totalMemory = IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForDataExchange();
+    int maxQueryThread = IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread();
+    queryPool = new MemoryPool("query", totalMemory, totalMemory / maxQueryThread);
   }
 
   public MemoryPool getQueryPool() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
index dfa08e033f..7d8765eaa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
@@ -52,4 +52,23 @@ public interface Operator extends AutoCloseable {
    * Is this operator completely finished processing and no more output TsBlock will be produced.
    */
   boolean isFinished();
+
+  // TODO remove the default while completing all the operators
+  /**
+   * We should also consider the memory used by its children operator, so the calculation logic may
+   * be like: long estimatedOfCurrentOperator = XXXXX; return max(estimatedOfCurrentOperator,
+   * child1.calculateMaxPeekMemory(), child2.calculateMaxPeekMemory(), ....)
+   *
+   * @return estimated max memory footprint that the Operator Tree(rooted from this operator) will
+   *     use while doing its own query processing
+   */
+  default long calculateMaxPeekMemory() {
+    return 0L;
+  }
+
+  // TODO remove the default while completing all the operators
+  /** @return estimated max memory footprint for returned TsBlock when calling operator.next() */
+  default long calculateMaxReturnSize() {
+    return 0L;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index da7ffe6ab1..94fa08fcd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -91,4 +91,17 @@ public class FillOperator implements ProcessOperator {
   public boolean isFinished() {
     return child.isFinished();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // while doing constant and previous fill, we may need to copy the corresponding column if there
+    // exists null values
+    // so the max peek memory may be double
+    return 2 * child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index ef3870fb17..4c5d608902 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -80,4 +80,14 @@ public class LimitOperator implements ProcessOperator {
   public boolean isFinished() {
     return remainingLimit == 0 || child.isFinished();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index fcae23ce7c..ee69557074 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -160,6 +160,20 @@ public class LinearFillOperator implements ProcessOperator {
     return cachedTsBlock.isEmpty() && child.isFinished();
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    // while doing linear fill, we may need to copy the corresponding column if there exists null
+    // values, and we may also need to cache next TsBlock to get next not null value
+    // so the max peek memory may be triple or more, here we just use 3 as the estimated factor
+    // because in most cases, we will get next not null value in next TsBlock
+    return 3 * child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
+
   /**
    * Judge whether we can use current cached TsBlock to fill Column
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index 2820992745..a329b25346 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -79,4 +79,14 @@ public class OffsetOperator implements ProcessOperator {
   public boolean isFinished() {
     return child.isFinished();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
index 38bda24bf7..a7cd8d046a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
@@ -54,4 +54,14 @@ public class SortOperator implements ProcessOperator {
   public boolean isFinished() {
     return false;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return 0;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return 0;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index f026ef35c4..386f840d6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerge
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -244,6 +245,24 @@ public class RowBasedTimeJoinOperator implements ProcessOperator {
     return finished;
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = calculateMaxReturnSize();
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      maxPeekMemory += child.calculateMaxReturnSize();
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+    }
+    return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // time + all value columns
+    return (1L + outputColumnCount)
+        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+  }
+
   private void updateTimeSelector(int index) {
     timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index]));
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
index eb185e09ff..bd5e005acd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerge
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -243,6 +244,24 @@ public class TimeJoinOperator implements ProcessOperator {
     return finished;
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = calculateMaxReturnSize();
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      maxPeekMemory += child.calculateMaxReturnSize();
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+    }
+    return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // time + all value columns
+    return (1L + outputColumnCount)
+        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+  }
+
   /**
    * If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
    * return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
index 7f011c32cf..f679bd1769 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
@@ -85,4 +85,22 @@ public class LastQueryCollectOperator implements ProcessOperator {
   public boolean isFinished() {
     return !hasNext();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = 0;
+    for (Operator child : children) {
+      maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+    }
+    return maxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long maxReturnMemory = 0;
+    for (Operator child : children) {
+      maxReturnMemory = Math.max(maxReturnMemory, child.calculateMaxReturnSize());
+    }
+    return maxReturnMemory;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index 99deebfd59..9d8be1e23d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.db.mpp.execution.operator.process.last;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.openjdk.jol.info.ClassLayout;
 
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -40,6 +42,8 @@ import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryU
 // time-series
 public class LastQueryMergeOperator implements ProcessOperator {
 
+  private static final int MAP_NODE_RETRAINED_SIZE = 16;
+
   private final OperatorContext operatorContext;
 
   private final List<Operator> children;
@@ -219,6 +223,30 @@ public class LastQueryMergeOperator implements ProcessOperator {
     return finished;
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    // result size + cached TreeMap size
+    long maxPeekMemory =
+        calculateMaxReturnSize()
+            + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+                * (Location.INSTANCE_SIZE + MAP_NODE_RETRAINED_SIZE);
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      maxPeekMemory += child.calculateMaxReturnSize();
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+    }
+    return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long maxReturnSize = 0;
+    for (Operator child : children) {
+      maxReturnSize = Math.max(maxReturnSize, child.calculateMaxReturnSize());
+    }
+    return maxReturnSize;
+  }
+
   /**
    * If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
    * return false;
@@ -241,6 +269,8 @@ public class LastQueryMergeOperator implements ProcessOperator {
   }
 
   private static class Location {
+
+    private static final long INSTANCE_SIZE = ClassLayout.parseClass(Location.class).instanceSize();
     int tsBlockIndex;
     int rowIndex;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index 57d391d5df..40e434ef54 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.successfulAsList;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 // collect all last query result in the same data region and there is no order guarantee
 public class LastQueryOperator implements ProcessOperator {
@@ -140,4 +141,19 @@ public class LastQueryOperator implements ProcessOperator {
   private int getEndIndex() {
     return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory =
+        Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, tsBlockBuilder.getRetainedSizeInBytes());
+    for (Operator child : children) {
+      maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+    }
+    return maxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, tsBlockBuilder.getRetainedSizeInBytes());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index 2945856933..1b56d844c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.successfulAsList;
 import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil.compareTimeSeries;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 // collect all last query result in the same data region and sort them according to the
 // time-series's alphabetical order
@@ -190,6 +191,22 @@ public class LastQuerySortOperator implements ProcessOperator {
     return !hasNext();
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory =
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + tsBlockBuilder.getRetainedSizeInBytes();
+    long maxChildrenReturnSize = 0;
+    for (Operator child : children) {
+      maxChildrenReturnSize = Math.max(maxChildrenReturnSize, child.calculateMaxReturnSize());
+    }
+    return maxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
   private int getEndIndex() {
     return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 93a5d81cc3..afd543b8dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -131,4 +131,14 @@ public class UpdateLastCacheOperator implements ProcessOperator {
   public void close() throws Exception {
     child.close();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
index c47ab9f95d..5b4f07cf24 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
@@ -37,6 +38,8 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
   private boolean hasCachedTsBlock = false;
   private boolean finished = false;
 
+  private final long maxReturnSize;
+
   public AlignedSeriesScanOperator(
       PlanNodeId sourceId,
       AlignedPath seriesPath,
@@ -54,6 +57,10 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
             timeFilter,
             valueFilter,
             ascending);
+    // time + all value columns
+    this.maxReturnSize =
+        (1L + seriesPath.getMeasurementList().size())
+            * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
   }
 
   @Override
@@ -65,7 +72,9 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
   public TsBlock next() {
     if (hasCachedTsBlock || hasNext()) {
       hasCachedTsBlock = false;
-      return tsBlock;
+      TsBlock res = tsBlock;
+      tsBlock = null;
+      return res;
     }
     throw new IllegalStateException("no next batch");
   }
@@ -114,6 +123,16 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
     return finished || (finished = !hasNext());
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
   private boolean readChunkData() throws IOException {
     while (seriesScanUtil.hasNextChunk()) {
       if (readPageData()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index c72063caeb..d9202de3f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public class ExchangeOperator implements SourceOperator {
 
   private final OperatorContext operatorContext;
@@ -62,6 +64,16 @@ public class ExchangeOperator implements SourceOperator {
     return sourceHandle.isFinished();
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
   @Override
   public PlanNodeId getSourceId() {
     return sourceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
index dfb6d82c5c..6e5ff45cfa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
@@ -57,6 +57,16 @@ public class LastCacheScanOperator implements SourceOperator {
     return !hasNext();
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return tsBlock.getRetainedSizeInBytes();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return tsBlock.getRetainedSizeInBytes();
+  }
+
   @Override
   public PlanNodeId getSourceId() {
     return sourceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index f74d14a2f6..5a58594940 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -38,6 +39,8 @@ public class SeriesScanOperator implements DataSourceOperator {
   private boolean hasCachedTsBlock = false;
   private boolean finished = false;
 
+  private final long maxReturnSize;
+
   public SeriesScanOperator(
       PlanNodeId sourceId,
       PartialPath seriesPath,
@@ -58,6 +61,7 @@ public class SeriesScanOperator implements DataSourceOperator {
             timeFilter,
             valueFilter,
             ascending);
+    this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
   }
 
   @Override
@@ -69,7 +73,9 @@ public class SeriesScanOperator implements DataSourceOperator {
   public TsBlock next() {
     if (hasCachedTsBlock || hasNext()) {
       hasCachedTsBlock = false;
-      return tsBlock;
+      TsBlock res = tsBlock;
+      tsBlock = null;
+      return res;
     }
     throw new IllegalStateException("no next batch");
   }
@@ -118,6 +124,16 @@ public class SeriesScanOperator implements DataSourceOperator {
     return finished || (finished = !hasNext());
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
   private boolean readChunkData() throws IOException {
     while (seriesScanUtil.hasNextChunk()) {
       if (readPageData()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index af6b72e47a..22fd2cbabc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -618,7 +618,9 @@ public class SeriesScanUtil {
 
     if (hasCachedNextOverlappedPage) {
       hasCachedNextOverlappedPage = false;
-      return cachedTsBlock;
+      TsBlock res = cachedTsBlock;
+      cachedTsBlock = null;
+      return res;
     } else {
 
       /*