You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/08 12:56:14 UTC

[iotdb] branch MemoryControl created (now 93cea7db31)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 93cea7db31 Add memory control for some operators

This branch includes the following new commits:

     new 93cea7db31 Add memory control for some operators

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Add memory control for some operators

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 93cea7db3125ccb0c865f9433856158f3f883e2c
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Aug 8 20:55:30 2022 +0800

    Add memory control for some operators
---
 .../resources/conf/iotdb-datanode.properties       |  6 +--
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 46 ++++++++++++++++------
 .../mpp/execution/memory/LocalMemoryManager.java   |  8 ++--
 .../iotdb/db/mpp/execution/operator/Operator.java  | 19 +++++++++
 .../execution/operator/process/FillOperator.java   | 13 ++++++
 .../execution/operator/process/LimitOperator.java  | 10 +++++
 .../operator/process/LinearFillOperator.java       | 14 +++++++
 .../execution/operator/process/OffsetOperator.java | 10 +++++
 .../execution/operator/process/SortOperator.java   | 10 +++++
 .../process/join/RowBasedTimeJoinOperator.java     | 19 +++++++++
 .../operator/process/join/TimeJoinOperator.java    | 19 +++++++++
 .../process/last/LastQueryCollectOperator.java     | 18 +++++++++
 .../process/last/LastQueryMergeOperator.java       | 30 ++++++++++++++
 .../operator/process/last/LastQueryOperator.java   | 16 ++++++++
 .../process/last/LastQuerySortOperator.java        | 17 ++++++++
 .../process/last/UpdateLastCacheOperator.java      | 10 +++++
 .../operator/source/AlignedSeriesScanOperator.java | 21 +++++++++-
 .../operator/source/ExchangeOperator.java          | 12 ++++++
 .../operator/source/LastCacheScanOperator.java     | 10 +++++
 .../operator/source/SeriesScanOperator.java        | 18 ++++++++-
 .../execution/operator/source/SeriesScanUtil.java  |  4 +-
 21 files changed, 307 insertions(+), 23 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 57448902cd..35a0f89d14 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -637,9 +637,9 @@ timestamp_precision=ms
 # Datatype: boolean
 # meta_data_cache_enable=true
 
-# Read memory Allocation Ratio: BloomFilterCache, ChunkCache, TimeSeriesMetadataCache, memory used for constructing QueryDataSet and Free Memory Used in Query.
-# The parameter form is a:b:c:d:e, where a, b, c, d and e are integers. for example: 1:1:1:1:1 , 1:100:200:300:400
-# chunk_timeseriesmeta_free_memory_proportion=1:100:200:300:400
+# Read memory Allocation Ratio: BloomFilterCache : ChunkCache : TimeSeriesMetadataCache : Coordinator : Operators : DataExchange.
+# The parameter form is a:b:c:d:e:f, where a, b, c, d, e and f are integers. for example: 1:1:1:1:1:1 , 1:100:200:50:300:350
+# chunk_timeseriesmeta_free_memory_proportion=1:100:200:50:300:350
 
 ####################
 ### LAST Cache Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index cb2583e4d9..17234c37be 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -130,10 +130,7 @@ public class IoTDBConfig {
   private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10;
 
   /** Memory allocated for the mtree */
-  private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() * 1 / 10;
-
-  /** Memory allocated for the read process besides cache */
-  private long allocateMemoryForReadWithoutCache = allocateMemoryForRead * 300 / 1001;
+  private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() / 10;
 
   private volatile int maxQueryDeduplicatedPathNum = 1000;
 
@@ -474,6 +471,15 @@ public class IoTDBConfig {
   /** Memory allocated for chunk cache in read process */
   private long allocateMemoryForChunkCache = allocateMemoryForRead * 100 / 1001;
 
+  /** Memory allocated for operators */
+  private long allocateMemoryForCoordinator = allocateMemoryForRead * 50 / 1001;
+
+  /** Memory allocated for operators */
+  private long allocateMemoryForOperators = allocateMemoryForRead * 300 / 1001;
+
+  /** Memory allocated for operators */
+  private long allocateMemoryForDataExchange = allocateMemoryForRead * 350 / 1001;
+
   /** Whether to enable Last cache */
   private boolean lastCacheEnable = true;
 
@@ -1715,14 +1721,6 @@ public class IoTDBConfig {
     this.allocateMemoryForRead = allocateMemoryForRead;
   }
 
-  public long getAllocateMemoryForReadWithoutCache() {
-    return allocateMemoryForReadWithoutCache;
-  }
-
-  public void setAllocateMemoryForReadWithoutCache(long allocateMemoryForReadWithoutCache) {
-    this.allocateMemoryForReadWithoutCache = allocateMemoryForReadWithoutCache;
-  }
-
   public boolean isEnableExternalSort() {
     return enableExternalSort;
   }
@@ -1934,6 +1932,30 @@ public class IoTDBConfig {
     this.allocateMemoryForChunkCache = allocateMemoryForChunkCache;
   }
 
+  public long getAllocateMemoryForCoordinator() {
+    return allocateMemoryForCoordinator;
+  }
+
+  public void setAllocateMemoryForCoordinator(long allocateMemoryForCoordinator) {
+    this.allocateMemoryForCoordinator = allocateMemoryForCoordinator;
+  }
+
+  public long getAllocateMemoryForOperators() {
+    return allocateMemoryForOperators;
+  }
+
+  public void setAllocateMemoryForOperators(long allocateMemoryForOperators) {
+    this.allocateMemoryForOperators = allocateMemoryForOperators;
+  }
+
+  public long getAllocateMemoryForDataExchange() {
+    return allocateMemoryForDataExchange;
+  }
+
+  public void setAllocateMemoryForDataExchange(long allocateMemoryForDataExchange) {
+    this.allocateMemoryForDataExchange = allocateMemoryForDataExchange;
+  }
+
   public boolean isLastCacheEnabled() {
     return lastCacheEnable;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
index d0eff60394..5c6f3c5659 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
@@ -30,11 +30,9 @@ public class LocalMemoryManager {
   private final MemoryPool queryPool;
 
   public LocalMemoryManager() {
-    queryPool =
-        new MemoryPool(
-            "query",
-            IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead(),
-            (long) (IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead() * 0.5));
+    long totalMemory = IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForDataExchange();
+    int maxQueryThread = IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread();
+    queryPool = new MemoryPool("query", totalMemory, totalMemory / maxQueryThread);
   }
 
   public MemoryPool getQueryPool() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
index dfa08e033f..7d8765eaa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
@@ -52,4 +52,23 @@ public interface Operator extends AutoCloseable {
    * Is this operator completely finished processing and no more output TsBlock will be produced.
    */
   boolean isFinished();
+
+  // TODO remove the default while completing all the operators
+  /**
+   * We should also consider the memory used by its children operator, so the calculation logic may
+   * be like: long estimatedOfCurrentOperator = XXXXX; return max(estimatedOfCurrentOperator,
+   * child1.calculateMaxPeekMemory(), child2.calculateMaxPeekMemory(), ....)
+   *
+   * @return estimated max memory footprint that the Operator Tree(rooted from this operator) will
+   *     use while doing its own query processing
+   */
+  default long calculateMaxPeekMemory() {
+    return 0L;
+  }
+
+  // TODO remove the default while completing all the operators
+  /** @return estimated max memory footprint for returned TsBlock when calling operator.next() */
+  default long calculateMaxReturnSize() {
+    return 0L;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index da7ffe6ab1..94fa08fcd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -91,4 +91,17 @@ public class FillOperator implements ProcessOperator {
   public boolean isFinished() {
     return child.isFinished();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // while doing constant and previous fill, we may need to copy the corresponding column if there
+    // exists null values
+    // so the max peek memory may be double
+    return 2 * child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index ef3870fb17..4c5d608902 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -80,4 +80,14 @@ public class LimitOperator implements ProcessOperator {
   public boolean isFinished() {
     return remainingLimit == 0 || child.isFinished();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index fcae23ce7c..ee69557074 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -160,6 +160,20 @@ public class LinearFillOperator implements ProcessOperator {
     return cachedTsBlock.isEmpty() && child.isFinished();
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    // while doing linear fill, we may need to copy the corresponding column if there exists null
+    // values, and we may also need to cache next TsBlock to get next not null value
+    // so the max peek memory may be triple or more, here we just use 3 as the estimated factor
+    // because in most cases, we will get next not null value in next TsBlock
+    return 3 * child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
+
   /**
    * Judge whether we can use current cached TsBlock to fill Column
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index 2820992745..a329b25346 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -79,4 +79,14 @@ public class OffsetOperator implements ProcessOperator {
   public boolean isFinished() {
     return child.isFinished();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
index 38bda24bf7..a7cd8d046a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
@@ -54,4 +54,14 @@ public class SortOperator implements ProcessOperator {
   public boolean isFinished() {
     return false;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return 0;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return 0;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index f026ef35c4..386f840d6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerge
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -244,6 +245,24 @@ public class RowBasedTimeJoinOperator implements ProcessOperator {
     return finished;
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = calculateMaxReturnSize();
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      maxPeekMemory += child.calculateMaxReturnSize();
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+    }
+    return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // time + all value columns
+    return (1L + outputColumnCount)
+        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+  }
+
   private void updateTimeSelector(int index) {
     timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index]));
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
index eb185e09ff..bd5e005acd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerge
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -243,6 +244,24 @@ public class TimeJoinOperator implements ProcessOperator {
     return finished;
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = calculateMaxReturnSize();
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      maxPeekMemory += child.calculateMaxReturnSize();
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+    }
+    return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // time + all value columns
+    return (1L + outputColumnCount)
+        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+  }
+
   /**
    * If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
    * return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
index 7f011c32cf..f679bd1769 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
@@ -85,4 +85,22 @@ public class LastQueryCollectOperator implements ProcessOperator {
   public boolean isFinished() {
     return !hasNext();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = 0;
+    for (Operator child : children) {
+      maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+    }
+    return maxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long maxReturnMemory = 0;
+    for (Operator child : children) {
+      maxReturnMemory = Math.max(maxReturnMemory, child.calculateMaxReturnSize());
+    }
+    return maxReturnMemory;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index 99deebfd59..9d8be1e23d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.db.mpp.execution.operator.process.last;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.openjdk.jol.info.ClassLayout;
 
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -40,6 +42,8 @@ import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryU
 // time-series
 public class LastQueryMergeOperator implements ProcessOperator {
 
+  private static final int MAP_NODE_RETRAINED_SIZE = 16;
+
   private final OperatorContext operatorContext;
 
   private final List<Operator> children;
@@ -219,6 +223,30 @@ public class LastQueryMergeOperator implements ProcessOperator {
     return finished;
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    // result size + cached TreeMap size
+    long maxPeekMemory =
+        calculateMaxReturnSize()
+            + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+                * (Location.INSTANCE_SIZE + MAP_NODE_RETRAINED_SIZE);
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      maxPeekMemory += child.calculateMaxReturnSize();
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+    }
+    return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long maxReturnSize = 0;
+    for (Operator child : children) {
+      maxReturnSize = Math.max(maxReturnSize, child.calculateMaxReturnSize());
+    }
+    return maxReturnSize;
+  }
+
   /**
    * If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
    * return false;
@@ -241,6 +269,8 @@ public class LastQueryMergeOperator implements ProcessOperator {
   }
 
   private static class Location {
+
+    private static final long INSTANCE_SIZE = ClassLayout.parseClass(Location.class).instanceSize();
     int tsBlockIndex;
     int rowIndex;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index 57d391d5df..40e434ef54 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.successfulAsList;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 // collect all last query result in the same data region and there is no order guarantee
 public class LastQueryOperator implements ProcessOperator {
@@ -140,4 +141,19 @@ public class LastQueryOperator implements ProcessOperator {
   private int getEndIndex() {
     return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory =
+        Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, tsBlockBuilder.getRetainedSizeInBytes());
+    for (Operator child : children) {
+      maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+    }
+    return maxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, tsBlockBuilder.getRetainedSizeInBytes());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index 2945856933..1b56d844c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.successfulAsList;
 import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil.compareTimeSeries;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 // collect all last query result in the same data region and sort them according to the
 // time-series's alphabetical order
@@ -190,6 +191,22 @@ public class LastQuerySortOperator implements ProcessOperator {
     return !hasNext();
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory =
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + tsBlockBuilder.getRetainedSizeInBytes();
+    long maxChildrenReturnSize = 0;
+    for (Operator child : children) {
+      maxChildrenReturnSize = Math.max(maxChildrenReturnSize, child.calculateMaxReturnSize());
+    }
+    return maxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
   private int getEndIndex() {
     return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 93a5d81cc3..afd543b8dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -131,4 +131,14 @@ public class UpdateLastCacheOperator implements ProcessOperator {
   public void close() throws Exception {
     child.close();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
index c47ab9f95d..5b4f07cf24 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
@@ -37,6 +38,8 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
   private boolean hasCachedTsBlock = false;
   private boolean finished = false;
 
+  private final long maxReturnSize;
+
   public AlignedSeriesScanOperator(
       PlanNodeId sourceId,
       AlignedPath seriesPath,
@@ -54,6 +57,10 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
             timeFilter,
             valueFilter,
             ascending);
+    // time + all value columns
+    this.maxReturnSize =
+        (1L + seriesPath.getMeasurementList().size())
+            * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
   }
 
   @Override
@@ -65,7 +72,9 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
   public TsBlock next() {
     if (hasCachedTsBlock || hasNext()) {
       hasCachedTsBlock = false;
-      return tsBlock;
+      TsBlock res = tsBlock;
+      tsBlock = null;
+      return res;
     }
     throw new IllegalStateException("no next batch");
   }
@@ -114,6 +123,16 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
     return finished || (finished = !hasNext());
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
   private boolean readChunkData() throws IOException {
     while (seriesScanUtil.hasNextChunk()) {
       if (readPageData()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index c72063caeb..d9202de3f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public class ExchangeOperator implements SourceOperator {
 
   private final OperatorContext operatorContext;
@@ -62,6 +64,16 @@ public class ExchangeOperator implements SourceOperator {
     return sourceHandle.isFinished();
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
   @Override
   public PlanNodeId getSourceId() {
     return sourceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
index dfb6d82c5c..6e5ff45cfa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
@@ -57,6 +57,16 @@ public class LastCacheScanOperator implements SourceOperator {
     return !hasNext();
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return tsBlock.getRetainedSizeInBytes();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return tsBlock.getRetainedSizeInBytes();
+  }
+
   @Override
   public PlanNodeId getSourceId() {
     return sourceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index f74d14a2f6..5a58594940 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -38,6 +39,8 @@ public class SeriesScanOperator implements DataSourceOperator {
   private boolean hasCachedTsBlock = false;
   private boolean finished = false;
 
+  private final long maxReturnSize;
+
   public SeriesScanOperator(
       PlanNodeId sourceId,
       PartialPath seriesPath,
@@ -58,6 +61,7 @@ public class SeriesScanOperator implements DataSourceOperator {
             timeFilter,
             valueFilter,
             ascending);
+    this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
   }
 
   @Override
@@ -69,7 +73,9 @@ public class SeriesScanOperator implements DataSourceOperator {
   public TsBlock next() {
     if (hasCachedTsBlock || hasNext()) {
       hasCachedTsBlock = false;
-      return tsBlock;
+      TsBlock res = tsBlock;
+      tsBlock = null;
+      return res;
     }
     throw new IllegalStateException("no next batch");
   }
@@ -118,6 +124,16 @@ public class SeriesScanOperator implements DataSourceOperator {
     return finished || (finished = !hasNext());
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
   private boolean readChunkData() throws IOException {
     while (seriesScanUtil.hasNextChunk()) {
       if (readPageData()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index af6b72e47a..22fd2cbabc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -618,7 +618,9 @@ public class SeriesScanUtil {
 
     if (hasCachedNextOverlappedPage) {
       hasCachedNextOverlappedPage = false;
-      return cachedTsBlock;
+      TsBlock res = cachedTsBlock;
+      cachedTsBlock = null;
+      return res;
     } else {
 
       /*