You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/19 07:21:59 UTC

[iotdb] branch master updated: [IOTDB-4070] Memory control for Query Operators (#6999)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 45923385e3 [IOTDB-4070] Memory control for Query Operators (#6999)
45923385e3 is described below

commit 45923385e3bcc9cdcc71ab66f35c13e07f4d1103
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Fri Aug 19 15:21:54 2022 +0800

    [IOTDB-4070] Memory control for Query Operators (#6999)
---
 .../org/apache/iotdb/it/env/DataNodeWrapper.java   |    2 +
 .../org/apache/iotdb/db/it/IoTDBNestedQueryIT.java |    1 +
 .../it/IoTDBSameMeasurementsDifferentTypesIT.java  |   51 +-
 .../db/it/schema/IoTDBSortedShowTimeseriesIT.java  |    4 +
 .../integration/IoTDBManageTsFileResourceIT.java   |    7 +-
 .../resources/conf/iotdb-datanode.properties       |   14 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   86 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   55 +-
 .../iotdb/db/metadata/LocalSchemaProcessor.java    |    6 +-
 .../db/metadata/rescon/SchemaResourceManager.java  |    4 +-
 ...tatistics.java => SchemaStatisticsManager.java} |   12 +-
 .../schemaregion/SchemaRegionMemoryImpl.java       |   12 +-
 .../schemaregion/SchemaRegionSchemaFileImpl.java   |   12 +-
 .../timerangeiterator/AggrWindowIterator.java      |   23 +
 .../timerangeiterator/ITimeRangeIterator.java      |    2 +
 .../timerangeiterator/PreAggrWindowIterator.java   |   17 +
 .../PreAggrWindowWithNaturalMonthIterator.java     |   16 +
 .../SingleTimeWindowIterator.java                  |    5 +
 .../MemoryNotEnoughException.java}                 |   25 +-
 .../fragment/FragmentInstanceContext.java          |    4 +
 .../mpp/execution/memory/LocalMemoryManager.java   |    4 +-
 .../db/mpp/execution/operator/AggregationUtil.java |   73 +
 .../iotdb/db/mpp/execution/operator/Operator.java  |   22 +
 .../operator/process/AggregationOperator.java      |   42 +-
 .../operator/process/DeviceMergeOperator.java      |   34 +
 .../operator/process/DeviceViewOperator.java       |   28 +
 .../execution/operator/process/FillOperator.java   |   19 +
 .../operator/process/FilterAndProjectOperator.java |  108 ++
 .../execution/operator/process/LimitOperator.java  |   15 +
 .../operator/process/LinearFillOperator.java       |   20 +
 .../execution/operator/process/OffsetOperator.java |   15 +
 .../process/RawDataAggregationOperator.java        |    7 +-
 .../process/SingleInputAggregationOperator.java    |   36 +-
 .../process/SlidingWindowAggregationOperator.java  |    6 +-
 .../execution/operator/process/SortOperator.java   |   15 +
 .../operator/process/TransformOperator.java        |   24 +
 .../process/join/RowBasedTimeJoinOperator.java     |   35 +
 .../operator/process/join/TimeJoinOperator.java    |   35 +
 .../process/last/LastQueryCollectOperator.java     |   28 +
 .../process/last/LastQueryMergeOperator.java       |   47 +
 .../operator/process/last/LastQueryOperator.java   |   26 +
 .../process/last/LastQuerySortOperator.java        |   27 +
 .../process/last/UpdateLastCacheOperator.java      |   15 +
 .../operator/schema/CountMergeOperator.java        |   29 +
 .../operator/schema/DevicesCountOperator.java      |   17 +
 .../schema/LevelTimeSeriesCountOperator.java       |   17 +
 .../schema/NodeManageMemoryMergeOperator.java      |   18 +
 .../operator/schema/NodePathsConvertOperator.java  |   15 +
 .../operator/schema/NodePathsCountOperator.java    |   18 +
 .../schema/NodePathsSchemaScanOperator.java        |   17 +
 .../operator/schema/SchemaFetchMergeOperator.java  |   29 +
 .../operator/schema/SchemaFetchScanOperator.java   |   32 +-
 .../operator/schema/SchemaQueryMergeOperator.java  |   29 +
 .../schema/SchemaQueryOrderByHeatOperator.java     |   40 +
 .../operator/schema/SchemaQueryScanOperator.java   |   35 +-
 .../operator/schema/TimeSeriesCountOperator.java   |   17 +
 .../AbstractSeriesAggregationScanOperator.java     |   38 +-
 .../AlignedSeriesAggregationScanOperator.java      |    9 +-
 .../operator/source/AlignedSeriesScanOperator.java |   26 +-
 .../operator/source/ExchangeOperator.java          |   17 +
 .../operator/source/LastCacheScanOperator.java     |   15 +
 .../source/SeriesAggregationScanOperator.java      |    9 +-
 .../operator/source/SeriesScanOperator.java        |   23 +-
 .../execution/operator/source/SeriesScanUtil.java  |    4 +-
 .../mpp/plan/analyze/GroupByLevelController.java   |   28 +-
 .../iotdb/db/mpp/plan/analyze/TypeProvider.java    |   14 +-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |   69 +-
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |    5 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  180 ++-
 .../StatisticsManager.java}                        |   35 +-
 .../TimeseriesStats.java}                          |   25 +-
 .../column/multi/MappableUDFColumnTransformer.java |    4 +
 .../column/ternary/TernaryColumnTransformer.java   |   12 +
 .../dag/column/unary/UnaryColumnTransformer.java   |    4 +
 .../iotdb/db/rescon/TsFileResourceManager.java     |    3 +-
 .../iotdb/db/utils/datastructure/TimeSelector.java |    5 +
 .../db/mpp/aggregation/TimeRangeIteratorTest.java  |    2 +
 .../operator/AggregationOperatorTest.java          |   15 +-
 .../AlignedSeriesAggregationScanOperatorTest.java  |   11 +-
 .../mpp/execution/operator/FillOperatorTest.java   |   30 +
 .../operator/LastQueryMergeOperatorTest.java       |   60 +
 .../execution/operator/LastQueryOperatorTest.java  |   18 +-
 .../operator/LastQuerySortOperatorTest.java        |   18 +-
 .../execution/operator/LinearFillOperatorTest.java |  105 ++
 .../mpp/execution/operator/OperatorMemoryTest.java | 1542 ++++++++++++++++++++
 .../operator/RawDataAggregationOperatorTest.java   |    5 +-
 .../SeriesAggregationScanOperatorTest.java         |    6 +-
 .../SlidingWindowAggregationOperatorTest.java      |   10 +-
 .../operator/UpdateLastCacheOperatorTest.java      |    6 +-
 .../iotdb/db/rescon/ResourceManagerTest.java       |   10 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |    2 +
 91 files changed, 3401 insertions(+), 316 deletions(-)

diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
index c3a9ad7c1f..56a27e24e1 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
@@ -58,6 +58,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
     if (this.targetConfigNode != null) {
       properties.setProperty(IoTDBConstant.TARGET_CONFIG_NODES, this.targetConfigNode);
     }
+    properties.setProperty("max_tsblock_size_in_bytes", "1024");
+    properties.setProperty("page_size_in_byte", "1024");
   }
 
   @Override
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBNestedQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBNestedQueryIT.java
index 413aadd643..3acd717f29 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBNestedQueryIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBNestedQueryIT.java
@@ -371,6 +371,7 @@ public class IoTDBNestedQueryIT {
         statement.executeQuery(query);
       } catch (SQLException e) {
         Assert.assertTrue(
+            e.getMessage(),
             e.getMessage()
                 .contains("The argument of the aggregation function must be a time series."));
       }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSameMeasurementsDifferentTypesIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSameMeasurementsDifferentTypesIT.java
index efa64e4745..82337bd862 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSameMeasurementsDifferentTypesIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSameMeasurementsDifferentTypesIT.java
@@ -47,7 +47,7 @@ import static org.junit.Assert.fail;
 @Category({LocalStandaloneIT.class, ClusterIT.class})
 public class IoTDBSameMeasurementsDifferentTypesIT {
 
-  private static BaseConfig tsFileConfig = ConfigFactory.getConfig();
+  private static final BaseConfig tsFileConfig = ConfigFactory.getConfig();
   private static int maxNumberOfPointsInPage;
   private static int pageSizeInByte;
   private static int groupSizeInByte;
@@ -88,10 +88,6 @@ public class IoTDBSameMeasurementsDifferentTypesIT {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
 
-      for (String sql : TestConstant.createSql) {
-        statement.execute(sql);
-      }
-
       statement.execute("SET STORAGE GROUP TO root.fans");
       statement.execute("CREATE TIMESERIES root.fans.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE");
       statement.execute("CREATE TIMESERIES root.fans.d1.s0 WITH DATATYPE=INT64, ENCODING=RLE");
@@ -127,14 +123,13 @@ public class IoTDBSameMeasurementsDifferentTypesIT {
       ResultSet resultSet1 = statement1.executeQuery(selectSql);
       int cnt1 = 0;
       while (resultSet1.next() && cnt1 < 5) {
-        StringBuilder builder = new StringBuilder();
-        builder
-            .append(resultSet1.getString(TestConstant.TIMESTAMP_STR))
-            .append(",")
-            .append(resultSet1.getString("root.fans.d0.s0"))
-            .append(",")
-            .append(resultSet1.getString("root.fans.d1.s0"));
-        Assert.assertEquals(retArray[cnt1], builder.toString());
+        String ans =
+            resultSet1.getString(TestConstant.TIMESTAMP_STR)
+                + ","
+                + resultSet1.getString("root.fans.d0.s0")
+                + ","
+                + resultSet1.getString("root.fans.d1.s0");
+        Assert.assertEquals(retArray[cnt1], ans);
         cnt1++;
       }
 
@@ -142,14 +137,13 @@ public class IoTDBSameMeasurementsDifferentTypesIT {
       ResultSet resultSet2 = statement2.executeQuery(selectSql);
       int cnt2 = 0;
       while (resultSet2.next()) {
-        StringBuilder builder = new StringBuilder();
-        builder
-            .append(resultSet2.getString(TestConstant.TIMESTAMP_STR))
-            .append(",")
-            .append(resultSet2.getString("root.fans.d0.s0"))
-            .append(",")
-            .append(resultSet2.getString("root.fans.d1.s0"));
-        Assert.assertEquals(retArray[cnt2], builder.toString());
+        String ans =
+            resultSet2.getString(TestConstant.TIMESTAMP_STR)
+                + ","
+                + resultSet2.getString("root.fans.d0.s0")
+                + ","
+                + resultSet2.getString("root.fans.d1.s0");
+        Assert.assertEquals(retArray[cnt2], ans);
         cnt2++;
       }
       Assert.assertEquals(9, cnt2);
@@ -158,14 +152,13 @@ public class IoTDBSameMeasurementsDifferentTypesIT {
       // function,
       // and the cursor has been moved to the next position, so we should fetch that value first.
       do {
-        StringBuilder builder = new StringBuilder();
-        builder
-            .append(resultSet1.getString(TestConstant.TIMESTAMP_STR))
-            .append(",")
-            .append(resultSet1.getString("root.fans.d0.s0"))
-            .append(",")
-            .append(resultSet1.getString("root.fans.d1.s0"));
-        Assert.assertEquals(retArray[cnt1], builder.toString());
+        String ans =
+            resultSet1.getString(TestConstant.TIMESTAMP_STR)
+                + ","
+                + resultSet1.getString("root.fans.d0.s0")
+                + ","
+                + resultSet1.getString("root.fans.d1.s0");
+        Assert.assertEquals(retArray[cnt1], ans);
         cnt1++;
       } while (resultSet1.next());
       // Although the statement2 has the same sql as statement1, they shouldn't affect each other.
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBSortedShowTimeseriesIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBSortedShowTimeseriesIT.java
index 0410304308..c1554b081b 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBSortedShowTimeseriesIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBSortedShowTimeseriesIT.java
@@ -189,6 +189,7 @@ public class IoTDBSortedShowTimeseriesIT {
         count++;
       }
       assertEquals(retArray1.size(), count);
+      resultSet.close();
 
       resultSet = statement.executeQuery("show LATEST timeseries");
       count = 0;
@@ -214,6 +215,7 @@ public class IoTDBSortedShowTimeseriesIT {
         count++;
       }
       assertEquals(retArray2.size(), count);
+      resultSet.close();
 
     } catch (Exception e) {
       e.printStackTrace();
@@ -266,6 +268,7 @@ public class IoTDBSortedShowTimeseriesIT {
         count++;
       }
       assertEquals(retSet.size(), count);
+      resultSet.close();
 
     } catch (Exception e) {
       e.printStackTrace();
@@ -313,6 +316,7 @@ public class IoTDBSortedShowTimeseriesIT {
         count++;
       }
       assertEquals(retArray.length, count);
+      resultSet.close();
 
     } catch (Exception e) {
       e.printStackTrace();
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java
index 41a270e637..96031214e7 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java
@@ -54,8 +54,7 @@ import static org.junit.Assert.assertTrue;
 public class IoTDBManageTsFileResourceIT {
   private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
   private TsFileResourceManager tsFileResourceManager = TsFileResourceManager.getInstance();
-  private double prevTimeIndexMemoryProportion;
-  private double prevTimeIndexMemoryThreshold;
+  private long prevTimeIndexMemoryThreshold;
   private int prevCompactionThreadNum;
 
   private static String[] unSeqSQLs =
@@ -89,7 +88,7 @@ public class IoTDBManageTsFileResourceIT {
   @Before
   public void setUp() throws ClassNotFoundException {
     EnvironmentUtils.envSetUp();
-    prevTimeIndexMemoryProportion = CONFIG.getTimeIndexMemoryProportion();
+    prevTimeIndexMemoryThreshold = CONFIG.getAllocateMemoryForTimeIndex();
     prevCompactionThreadNum = CONFIG.getConcurrentCompactionThread();
     Class.forName(Config.JDBC_DRIVER_NAME);
   }
@@ -97,8 +96,6 @@ public class IoTDBManageTsFileResourceIT {
   @After
   public void tearDown() throws Exception {
     EnvironmentUtils.cleanEnv();
-    prevTimeIndexMemoryThreshold =
-        prevTimeIndexMemoryProportion * CONFIG.getAllocateMemoryForRead();
     tsFileResourceManager.setTimeIndexMemoryThreshold(prevTimeIndexMemoryThreshold);
     CONFIG.setConcurrentCompactionThread(prevCompactionThreadNum);
   }
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 85dc0ce5ff..42d67f75f0 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -448,10 +448,6 @@ timestamp_precision=ms
 # Datatype: double
 # flush_proportion=0.4
 
-# Ratio of read memory allocated for timeIndex, 0.2 by default
-# Datatype: double
-# time_index_memory_proportion=0.2
-
 # Ratio of write memory allocated for buffered arrays, 0.6 by default
 # Datatype: double
 # buffered_arrays_memory_proportion=0.6
@@ -485,6 +481,10 @@ timestamp_precision=ms
 # Datatype: int
 # io_task_queue_size_for_flushing=10
 
+# If true, we will estimate each query's possible memory footprint before executing it and deny it if its estimated memory exceeds current free memory
+# Datatype: bool
+# enable_query_memory_estimation=true
+
 ####################
 ### Upgrade Configurations
 ####################
@@ -646,9 +646,9 @@ timestamp_precision=ms
 # Datatype: boolean
 # meta_data_cache_enable=true
 
-# Read memory Allocation Ratio: BloomFilterCache, ChunkCache, TimeSeriesMetadataCache, memory used for constructing QueryDataSet and Free Memory Used in Query.
-# The parameter form is a:b:c:d:e, where a, b, c, d and e are integers. for example: 1:1:1:1:1 , 1:100:200:300:400
-# chunk_timeseriesmeta_free_memory_proportion=1:100:200:300:400
+# Read memory Allocation Ratio: BloomFilterCache : ChunkCache : TimeSeriesMetadataCache : Coordinator : Operators : DataExchange : timeIndex in TsFileResourceList : others.
+# The parameter form is a:b:c:d:e:f:g:h, where a, b, c, d, e, f, g and h are integers. for example: 1:1:1:1:1:1:1:1 , 1:100:200:50:200:200:200:50
+# chunk_timeseriesmeta_free_memory_proportion=1:100:200:50:200:200:200:50
 
 ####################
 ### LAST Cache Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 3ee6e92b35..379ba1fe6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -137,19 +137,13 @@ public class IoTDBConfig {
   private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10;
 
   /** Memory allocated for the mtree */
-  private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() * 1 / 10;
-
-  /** Memory allocated for the read process besides cache */
-  private long allocateMemoryForReadWithoutCache = allocateMemoryForRead * 300 / 1001;
+  private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() / 10;
 
   private volatile int maxQueryDeduplicatedPathNum = 1000;
 
   /** Ratio of memory allocated for buffered arrays */
   private double bufferedArraysMemoryProportion = 0.6;
 
-  /** Memory allocated proportion for timeIndex */
-  private double timeIndexMemoryProportion = 0.2;
-
   /** Flush proportion for system */
   private double flushProportion = 0.4;
 
@@ -477,6 +471,24 @@ public class IoTDBConfig {
   /** Memory allocated for chunk cache in read process */
   private long allocateMemoryForChunkCache = allocateMemoryForRead * 100 / 1001;
 
+  /** Memory allocated for operators */
+  private long allocateMemoryForCoordinator = allocateMemoryForRead * 50 / 1001;
+
+  /** Memory allocated for operators */
+  private long allocateMemoryForOperators = allocateMemoryForRead * 200 / 1001;
+
+  /** Memory allocated for operators */
+  private long allocateMemoryForDataExchange = allocateMemoryForRead * 200 / 1001;
+
+  /** Memory allocated proportion for timeIndex */
+  private long allocateMemoryForTimeIndex = allocateMemoryForRead * 200 / 1001;
+
+  /**
+   * If true, we will estimate each query's possible memory footprint before executing it and deny
+   * it if its estimated memory exceeds current free memory
+   */
+  private boolean enableQueryMemoryEstimation = true;
+
   /** Whether to enable Last cache */
   private boolean lastCacheEnable = true;
 
@@ -1358,6 +1370,10 @@ public class IoTDBConfig {
     this.concurrentSubRawQueryThread = concurrentSubRawQueryThread;
   }
 
+  public long getMaxBytesPerQuery() {
+    return allocateMemoryForDataExchange / concurrentQueryThread;
+  }
+
   public int getRawQueryBlockingQueueCapacity() {
     return rawQueryBlockingQueueCapacity;
   }
@@ -1688,14 +1704,6 @@ public class IoTDBConfig {
     this.bufferedArraysMemoryProportion = bufferedArraysMemoryProportion;
   }
 
-  public double getTimeIndexMemoryProportion() {
-    return timeIndexMemoryProportion;
-  }
-
-  public void setTimeIndexMemoryProportion(double timeIndexMemoryProportion) {
-    this.timeIndexMemoryProportion = timeIndexMemoryProportion;
-  }
-
   public double getFlushProportion() {
     return flushProportion;
   }
@@ -1744,14 +1752,6 @@ public class IoTDBConfig {
     this.allocateMemoryForRead = allocateMemoryForRead;
   }
 
-  public long getAllocateMemoryForReadWithoutCache() {
-    return allocateMemoryForReadWithoutCache;
-  }
-
-  public void setAllocateMemoryForReadWithoutCache(long allocateMemoryForReadWithoutCache) {
-    this.allocateMemoryForReadWithoutCache = allocateMemoryForReadWithoutCache;
-  }
-
   public boolean isEnableExternalSort() {
     return enableExternalSort;
   }
@@ -1963,6 +1963,46 @@ public class IoTDBConfig {
     this.allocateMemoryForChunkCache = allocateMemoryForChunkCache;
   }
 
+  public long getAllocateMemoryForCoordinator() {
+    return allocateMemoryForCoordinator;
+  }
+
+  public void setAllocateMemoryForCoordinator(long allocateMemoryForCoordinator) {
+    this.allocateMemoryForCoordinator = allocateMemoryForCoordinator;
+  }
+
+  public long getAllocateMemoryForOperators() {
+    return allocateMemoryForOperators;
+  }
+
+  public void setAllocateMemoryForOperators(long allocateMemoryForOperators) {
+    this.allocateMemoryForOperators = allocateMemoryForOperators;
+  }
+
+  public long getAllocateMemoryForDataExchange() {
+    return allocateMemoryForDataExchange;
+  }
+
+  public void setAllocateMemoryForDataExchange(long allocateMemoryForDataExchange) {
+    this.allocateMemoryForDataExchange = allocateMemoryForDataExchange;
+  }
+
+  public long getAllocateMemoryForTimeIndex() {
+    return allocateMemoryForTimeIndex;
+  }
+
+  public void setAllocateMemoryForTimeIndex(long allocateMemoryForTimeIndex) {
+    this.allocateMemoryForTimeIndex = allocateMemoryForTimeIndex;
+  }
+
+  public boolean isEnableQueryMemoryEstimation() {
+    return enableQueryMemoryEstimation;
+  }
+
+  public void setEnableQueryMemoryEstimation(boolean enableQueryMemoryEstimation) {
+    this.enableQueryMemoryEstimation = enableQueryMemoryEstimation;
+  }
+
   public boolean isLastCacheEnabled() {
     return lastCacheEnable;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 3d9e81624f..6376b09224 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -200,12 +200,6 @@ public class IoTDBDescriptor {
                   "buffered_arrays_memory_proportion",
                   Double.toString(conf.getBufferedArraysMemoryProportion()))));
 
-      conf.setTimeIndexMemoryProportion(
-          Double.parseDouble(
-              properties.getProperty(
-                  "time_index_memory_proportion",
-                  Double.toString(conf.getTimeIndexMemoryProportion()))));
-
       conf.setFlushProportion(
           Double.parseDouble(
               properties.getProperty(
@@ -1223,6 +1217,16 @@ public class IoTDBDescriptor {
                     "max_tsblock_size_in_bytes",
                     Integer.toString(
                         TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes()))));
+
+    // min(default_size, maxBytesForQuery)
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setMaxTsBlockSizeInBytes(
+            (int)
+                Math.min(
+                    TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
+                    conf.getMaxBytesPerQuery()));
+
     TSFileDescriptor.getInstance()
         .getConfig()
         .setMaxTsBlockLineNumber(
@@ -1421,6 +1425,13 @@ public class IoTDBDescriptor {
                   .trim()));
       conf.setIpWhiteList(properties.getProperty("ip_white_list", conf.getIpWhiteList()));
 
+      // update enable query memory estimation for memory control
+      conf.setEnableQueryMemoryEstimation(
+          Boolean.parseBoolean(
+              properties.getProperty(
+                  "enable_query_memory_estimation",
+                  Boolean.toString(conf.isEnableQueryMemoryEstimation()))));
+
       // update wal config
       long prevDeleteWalFilesPeriodInMs = conf.getDeleteWalFilesPeriodInMs();
       loadWALHotModifiedProps(properties);
@@ -1485,9 +1496,11 @@ public class IoTDBDescriptor {
                 "max_deduplicated_path_num",
                 Integer.toString(conf.getMaxQueryDeduplicatedPathNum()))));
 
-    if (!conf.isMetaDataCacheEnable()) {
-      return;
-    }
+    conf.setEnableQueryMemoryEstimation(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_query_memory_estimation",
+                Boolean.toString(conf.isEnableQueryMemoryEstimation()))));
 
     String queryMemoryAllocateProportion =
         properties.getProperty("chunk_timeseriesmeta_free_memory_proportion");
@@ -1506,8 +1519,14 @@ public class IoTDBDescriptor {
               maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum);
           conf.setAllocateMemoryForTimeSeriesMetaDataCache(
               maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / proportionSum);
-          conf.setAllocateMemoryForReadWithoutCache(
+          conf.setAllocateMemoryForCoordinator(
               maxMemoryAvailable * Integer.parseInt(proportions[3].trim()) / proportionSum);
+          conf.setAllocateMemoryForOperators(
+              maxMemoryAvailable * Integer.parseInt(proportions[4].trim()) / proportionSum);
+          conf.setAllocateMemoryForDataExchange(
+              maxMemoryAvailable * Integer.parseInt(proportions[5].trim()) / proportionSum);
+          conf.setAllocateMemoryForTimeIndex(
+              maxMemoryAvailable * Integer.parseInt(proportions[6].trim()) / proportionSum);
         } catch (Exception e) {
           throw new RuntimeException(
               "Each subsection of configuration item chunkmeta_chunk_timeseriesmeta_free_memory_proportion"
@@ -1516,6 +1535,22 @@ public class IoTDBDescriptor {
         }
       }
     }
+
+    // metadata cache is disabled, we need to move all their allocated memory to other parts
+    if (!conf.isMetaDataCacheEnable()) {
+      long sum =
+          conf.getAllocateMemoryForBloomFilterCache()
+              + conf.getAllocateMemoryForChunkCache()
+              + conf.getAllocateMemoryForTimeSeriesMetaDataCache();
+      conf.setAllocateMemoryForBloomFilterCache(0);
+      conf.setAllocateMemoryForChunkCache(0);
+      conf.setAllocateMemoryForTimeSeriesMetaDataCache(0);
+      long partForDataExchange = sum / 2;
+      long partForOperators = sum - partForDataExchange;
+      conf.setAllocateMemoryForDataExchange(
+          conf.getAllocateMemoryForDataExchange() + partForDataExchange);
+      conf.setAllocateMemoryForOperators(conf.getAllocateMemoryForOperators() + partForOperators);
+    }
   }
 
   private void initSchemaMemoryAllocate(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index 32518a6239..fafd13ddd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.metadata.mnode.IMNode;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.template.Template;
@@ -466,7 +466,7 @@ public class LocalSchemaProcessor {
     // todo this is for test assistance, refactor this to support massive timeseries
     if (pathPattern.getFullPath().equals("root.**")
         && TemplateManager.getInstance().getAllTemplateName().isEmpty()) {
-      return (int) TimeseriesStatistics.getInstance().getTotalSeriesNumber();
+      return (int) SchemaStatisticsManager.getInstance().getTotalSeriesNumber();
     }
     int count = 0;
     for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
@@ -1380,7 +1380,7 @@ public class LocalSchemaProcessor {
 
   @TestOnly
   public long getTotalSeriesNumber() {
-    return TimeseriesStatistics.getInstance().getTotalSeriesNumber();
+    return SchemaStatisticsManager.getInstance().getTotalSeriesNumber();
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
index 0fe18ac955..c5df8ff9a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
@@ -30,7 +30,7 @@ public class SchemaResourceManager {
   private SchemaResourceManager() {}
 
   public static void initSchemaResource() {
-    TimeseriesStatistics.getInstance().init();
+    SchemaStatisticsManager.getInstance().init();
     MemoryStatistics.getInstance().init();
     if (IoTDBDescriptor.getInstance()
         .getConfig()
@@ -41,7 +41,7 @@ public class SchemaResourceManager {
   }
 
   public static void clearSchemaResource() {
-    TimeseriesStatistics.getInstance().clear();
+    SchemaStatisticsManager.getInstance().clear();
     MemoryStatistics.getInstance().clear();
     if (IoTDBDescriptor.getInstance()
         .getConfig()
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
rename to server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
index f8a1f21a20..6d63cdfe85 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
@@ -25,22 +25,22 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 
 import java.util.concurrent.atomic.AtomicLong;
 
-public class TimeseriesStatistics {
+public class SchemaStatisticsManager {
 
   private final AtomicLong totalSeriesNumber = new AtomicLong();
 
-  private static class TimeseriesStatisticsHolder {
+  private static class SchemaStatisticsHolder {
 
-    private TimeseriesStatisticsHolder() {
+    private SchemaStatisticsHolder() {
       // allowed to do nothing
     }
 
-    private static final TimeseriesStatistics INSTANCE = new TimeseriesStatistics();
+    private static final SchemaStatisticsManager INSTANCE = new SchemaStatisticsManager();
   }
 
   /** we should not use this function in other place, but only in IoTDB class */
-  public static TimeseriesStatistics getInstance() {
-    return TimeseriesStatisticsHolder.INSTANCE;
+  public static SchemaStatisticsManager getInstance() {
+    return SchemaStatisticsHolder.INSTANCE;
   }
 
   public void init() {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 71c6b3632e..a4c2c8a8fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -52,7 +52,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGMemoryImpl;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
 import org.apache.iotdb.db.metadata.tag.TagManager;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.template.TemplateManager;
@@ -168,7 +168,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   private boolean usingMLog = true;
   private MLogWriter logWriter;
 
-  private TimeseriesStatistics timeseriesStatistics = TimeseriesStatistics.getInstance();
+  private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance();
   private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
 
   private final IStorageGroupMNode storageGroupMNode;
@@ -454,7 +454,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     // collect all the LeafMNode in this schema region
     List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
 
-    timeseriesStatistics.deleteTimeseries(leafMNodes.size());
+    schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
 
     // drop triggers with no exceptions
     TriggerEngine.drop(leafMNodes);
@@ -601,7 +601,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       mNodeCache.invalidate(path.getDevicePath());
 
       // update statistics and schemaDataTypeNumMap
-      timeseriesStatistics.addTimeseries(1);
+      schemaStatisticsManager.addTimeseries(1);
 
       // update tag index
       if (offset != -1 && isRecovering) {
@@ -718,7 +718,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       mNodeCache.invalidate(prefixPath);
 
       // update statistics and schemaDataTypeNumMap
-      timeseriesStatistics.addTimeseries(plan.getMeasurements().size());
+      schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
 
       List<Long> tagOffsets = plan.getTagOffsets();
       for (int i = 0; i < measurements.size(); i++) {
@@ -860,7 +860,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
 
     mNodeCache.invalidate(node.getPartialPath());
 
-    timeseriesStatistics.deleteTimeseries(1);
+    schemaStatisticsManager.deleteTimeseries(1);
     return storageGroupPath;
   }
   // endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 34c49106b3..57129f5063 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -50,7 +50,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGCachedImpl;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
 import org.apache.iotdb.db.metadata.tag.TagManager;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.template.TemplateManager;
@@ -162,7 +162,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
   private File logFile;
   private MLogWriter logWriter;
 
-  private TimeseriesStatistics timeseriesStatistics = TimeseriesStatistics.getInstance();
+  private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance();
   private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
 
   private final IStorageGroupMNode storageGroupMNode;
@@ -413,7 +413,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     // collect all the LeafMNode in this schema region
     List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
 
-    timeseriesStatistics.deleteTimeseries(leafMNodes.size());
+    schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
 
     // drop triggers with no exceptions
     TriggerEngine.drop(leafMNodes);
@@ -495,7 +495,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
         mNodeCache.invalidate(path.getDevicePath());
 
         // update statistics and schemaDataTypeNumMap
-        timeseriesStatistics.addTimeseries(1);
+        schemaStatisticsManager.addTimeseries(1);
 
         // update tag index
         if (offset != -1 && isRecovering) {
@@ -637,7 +637,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
         mNodeCache.invalidate(prefixPath);
 
         // update statistics and schemaDataTypeNumMap
-        timeseriesStatistics.addTimeseries(plan.getMeasurements().size());
+        schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
 
         List<Long> tagOffsets = plan.getTagOffsets();
         for (int i = 0; i < measurements.size(); i++) {
@@ -785,7 +785,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
 
     mNodeCache.invalidate(node.getPartialPath());
 
-    timeseriesStatistics.deleteTimeseries(1);
+    schemaStatisticsManager.deleteTimeseries(1);
     return storageGroupPath;
   }
   // endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
index f27c56223c..50f50e4175 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
@@ -176,4 +176,27 @@ public class AggrWindowIterator implements ITimeRangeIterator {
   public long currentOutputTime() {
     return leftCRightO ? curTimeRange.getMin() : curTimeRange.getMax();
   }
+
+  @Override
+  public long getTotalIntervalNum() {
+    long queryRange = endTime - startTime;
+    long intervalNum;
+
+    if (isSlidingStepByMonth) {
+      intervalNum = (long) Math.ceil(queryRange / (double) (slidingStep * MS_TO_MONTH));
+      long retStartTime = DatetimeUtils.calcIntervalByMonth(startTime, intervalNum * slidingStep);
+      while (retStartTime > endTime) {
+        intervalNum -= 1;
+        retStartTime = DatetimeUtils.calcIntervalByMonth(startTime, intervalNum * slidingStep);
+      }
+    } else {
+      intervalNum = (long) Math.ceil(queryRange / (double) slidingStep);
+    }
+    return intervalNum;
+  }
+
+  public void reset() {
+    curTimeRange = null;
+    hasCachedTimeRange = false;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
index ccc999e810..47a6961fd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
@@ -55,4 +55,6 @@ public interface ITimeRangeIterator {
    * @return minTime if leftCloseRightOpen, else maxTime.
    */
   long currentOutputTime();
+
+  long getTotalIntervalNum();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowIterator.java
index f7f6297ddb..c0b8c3b000 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowIterator.java
@@ -173,4 +173,21 @@ public class PreAggrWindowIterator implements ITimeRangeIterator {
   public long currentOutputTime() {
     return leftCRightO ? curTimeRange.getMin() : curTimeRange.getMax();
   }
+
+  @Override
+  public long getTotalIntervalNum() {
+    long queryRange = endTime - startTime;
+    if (slidingStep >= interval || interval % slidingStep == 0) {
+      return (long) Math.ceil(queryRange / (double) slidingStep);
+    }
+
+    long interval1 = interval % slidingStep, interval2 = slidingStep - interval % slidingStep;
+    long intervalNum = Math.floorDiv(queryRange, interval1 + interval2);
+    long tmpStartTime = startTime + intervalNum * (interval1 + interval2);
+    if (tmpStartTime + interval1 > endTime) {
+      return intervalNum * 2 + 1;
+    } else {
+      return intervalNum * 2 + 2;
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
index d8eede01ef..87f6cb8630 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
@@ -147,4 +147,20 @@ public class PreAggrWindowWithNaturalMonthIterator implements ITimeRangeIterator
   public long currentOutputTime() {
     return leftCRightO ? curTimeRange.getMin() : curTimeRange.getMax();
   }
+
+  @Override
+  public long getTotalIntervalNum() {
+    long tmpInterval = 0;
+    while (hasNextTimeRange()) {
+      tmpInterval++;
+      nextTimeRange();
+    }
+
+    curTimeRange = null;
+    timeBoundaryHeap.clear();
+    aggrWindowIterator.reset();
+    initHeap();
+
+    return tmpInterval;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
index b998557604..4802267afa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
@@ -71,4 +71,9 @@ public class SingleTimeWindowIterator implements ITimeRangeIterator {
   public long currentOutputTime() {
     return curTimeRange.getMin();
   }
+
+  @Override
+  public long getTotalIntervalNum() {
+    return 1;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/exception/MemoryNotEnoughException.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/exception/MemoryNotEnoughException.java
index d0eff60394..7c5f4803e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/exception/MemoryNotEnoughException.java
@@ -16,28 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.db.mpp.exception;
 
-package org.apache.iotdb.db.mpp.execution.memory;
+import org.apache.iotdb.commons.exception.IoTDBException;
 
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+public class MemoryNotEnoughException extends IoTDBException {
 
-/**
- * Manages memory of a data node. The memory is divided into two memory pools so that the memory for
- * read and for write can be isolated.
- */
-public class LocalMemoryManager {
-
-  private final MemoryPool queryPool;
-
-  public LocalMemoryManager() {
-    queryPool =
-        new MemoryPool(
-            "query",
-            IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead(),
-            (long) (IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead() * 0.5));
-  }
-
-  public MemoryPool getQueryPool() {
-    return queryPool;
+  public MemoryNotEnoughException(String message, int errorCode) {
+    super(message, errorCode);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index aff2d66a41..4d0f9d9ffd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -179,4 +179,8 @@ public class FragmentInstanceContext extends QueryContext {
   public long getEndTime() {
     return executionEndTime.get();
   }
+
+  public FragmentInstanceStateMachine getStateMachine() {
+    return stateMachine;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
index d0eff60394..e3d170cbf7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
@@ -33,8 +33,8 @@ public class LocalMemoryManager {
     queryPool =
         new MemoryPool(
             "query",
-            IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead(),
-            (long) (IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead() * 0.5));
+            IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForDataExchange(),
+            IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerQuery());
   }
 
   public MemoryPool getQueryPool() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
index 5465800553..5bbb1adfa0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
@@ -19,20 +19,35 @@
 
 package org.apache.iotdb.db.mpp.execution.operator;
 
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.SingleTimeWindowIterator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.TimeRangeIteratorFactory;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.statistics.StatisticsManager;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
 
 public class AggregationUtil {
@@ -158,4 +173,62 @@ public class AggregationUtil {
     }
     return true;
   }
+
+  public static long calculateMaxAggregationResultSize(
+      List<? extends AggregationDescriptor> aggregationDescriptors,
+      ITimeRangeIterator timeRangeIterator,
+      TypeProvider typeProvider) {
+    long timeValueColumnsSizePerLine = TimeColumn.SIZE_IN_BYTES_PER_POSITION;
+    for (AggregationDescriptor descriptor : aggregationDescriptors) {
+      List<TSDataType> outPutDataTypes =
+          descriptor.getOutputColumnNames().stream()
+              .map(typeProvider::getType)
+              .collect(Collectors.toList());
+      for (TSDataType tsDataType : outPutDataTypes) {
+        // TODO modify after statistics finish
+        PartialPath mockSeriesPath = new PartialPath();
+        timeValueColumnsSizePerLine += getOutputColumnSizePerLine(tsDataType, mockSeriesPath);
+      }
+    }
+
+    return Math.min(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        Math.min(
+                TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
+                timeRangeIterator.getTotalIntervalNum())
+            * timeValueColumnsSizePerLine);
+  }
+
+  public static long calculateMaxAggregationResultSizeForLastQuery(
+      List<Aggregator> aggregators, PartialPath inputSeriesPath) {
+    long timeValueColumnsSizePerLine = TimeColumn.SIZE_IN_BYTES_PER_POSITION;
+    List<TSDataType> outPutDataTypes =
+        aggregators.stream()
+            .flatMap(aggregator -> Arrays.stream(aggregator.getOutputType()))
+            .collect(Collectors.toList());
+    for (TSDataType tsDataType : outPutDataTypes) {
+      timeValueColumnsSizePerLine += getOutputColumnSizePerLine(tsDataType, inputSeriesPath);
+    }
+    return timeValueColumnsSizePerLine;
+  }
+
+  private static long getOutputColumnSizePerLine(
+      TSDataType tsDataType, PartialPath inputSeriesPath) {
+    switch (tsDataType) {
+      case INT32:
+        return IntColumn.SIZE_IN_BYTES_PER_POSITION;
+      case INT64:
+        return LongColumn.SIZE_IN_BYTES_PER_POSITION;
+      case FLOAT:
+        return FloatColumn.SIZE_IN_BYTES_PER_POSITION;
+      case DOUBLE:
+        return DoubleColumn.SIZE_IN_BYTES_PER_POSITION;
+      case BOOLEAN:
+        return BooleanColumn.SIZE_IN_BYTES_PER_POSITION;
+      case TEXT:
+        return StatisticsManager.getInstance().getMaxBinarySizeInBytes(inputSeriesPath);
+      default:
+        throw new UnsupportedOperationException("Unknown data type " + tsDataType);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
index dfa08e033f..b7b05a9289 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
@@ -52,4 +52,26 @@ public interface Operator extends AutoCloseable {
    * Is this operator completely finished processing and no more output TsBlock will be produced.
    */
   boolean isFinished();
+
+  /**
+   * We should also consider the memory used by its children operator, so the calculation logic may
+   * be like: long estimatedOfCurrentOperator = XXXXX; return max(estimatedOfCurrentOperator,
+   * child1.calculateMaxPeekMemory(), child2.calculateMaxPeekMemory(), ....)
+   *
+   * <p>Each operator's MaxPeekMemory should also take retained size of each child operator into
+   * account.
+   *
+   * @return estimated max memory footprint that the Operator Tree(rooted from this operator) will
+   *     use while doing its own query processing
+   */
+  long calculateMaxPeekMemory();
+
+  /** @return estimated max memory footprint for returned TsBlock when calling operator.next() */
+  long calculateMaxReturnSize();
+
+  /**
+   * @return each operator's retained size(including all its children's retained size) after calling
+   *     its next() method
+   */
+  long calculateRetainedSizeAfterCallingNext();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 8cf4aef8b0..62215cb9c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -37,7 +36,6 @@ import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.successfulAsList;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 
 /**
  * AggregationOperator can process the situation: aggregation of intermediate aggregate result, it
@@ -62,16 +60,20 @@ public class AggregationOperator implements ProcessOperator {
   // using for building result tsBlock
   private final TsBlockBuilder resultTsBlockBuilder;
 
+  private final long maxRetainedSize;
+  private final long childrenRetainedSize;
+  private final long maxReturnSize;
+
   public AggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       List<Operator> children,
-      boolean ascending,
-      GroupByTimeParameter groupByTimeParameter,
-      boolean outputPartialTimeWindow) {
+      long maxReturnSize) {
     this.operatorContext = operatorContext;
     this.children = children;
     this.aggregators = aggregators;
+    this.timeRangeIterator = timeRangeIterator;
 
     this.inputOperatorsCount = children.size();
     this.inputTsBlocks = new TsBlock[inputOperatorsCount];
@@ -80,14 +82,16 @@ public class AggregationOperator implements ProcessOperator {
       canCallNext[i] = false;
     }
 
-    this.timeRangeIterator =
-        initTimeRangeIterator(groupByTimeParameter, ascending, outputPartialTimeWindow);
-
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+    this.maxRetainedSize = children.stream().mapToLong(Operator::calculateMaxReturnSize).sum();
+    this.childrenRetainedSize =
+        children.stream().mapToLong(Operator::calculateRetainedSizeAfterCallingNext).sum();
+    this.maxReturnSize = maxReturnSize;
   }
 
   @Override
@@ -95,6 +99,21 @@ public class AggregationOperator implements ProcessOperator {
     return operatorContext;
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxReturnSize + maxRetainedSize + childrenRetainedSize;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return maxRetainedSize + childrenRetainedSize;
+  }
+
   @Override
   public ListenableFuture<?> isBlocked() {
     List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
@@ -123,9 +142,6 @@ public class AggregationOperator implements ProcessOperator {
     long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
     long start = System.nanoTime();
 
-    // reset operator state
-    resultTsBlockBuilder.reset();
-
     while (System.nanoTime() - start < maxRuntime
         && (curTimeRange != null || timeRangeIterator.hasNextTimeRange())
         && !resultTsBlockBuilder.isFull()) {
@@ -149,7 +165,9 @@ public class AggregationOperator implements ProcessOperator {
     }
 
     if (resultTsBlockBuilder.getPositionCount() > 0) {
-      return resultTsBlockBuilder.build();
+      TsBlock resultTsBlock = resultTsBlockBuilder.build();
+      resultTsBlockBuilder.reset();
+      return resultTsBlock;
     } else {
       return null;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index e0ccbe86ba..511a75991b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
@@ -278,4 +279,37 @@ public class DeviceMergeOperator implements ProcessOperator {
     return inputTsBlocks[tsBlockIndex] == null
         || inputTsBlocks[tsBlockIndex].getPositionCount() == 0;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // timeSelector will cache time, we use a single time column to represent max memory cost
+    long maxPeekMemory = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    // inputTsBlocks will cache all TsBlocks returned by deviceOperators
+    for (Operator operator : deviceOperators) {
+      maxPeekMemory += operator.calculateMaxReturnSize();
+      maxPeekMemory += operator.calculateRetainedSizeAfterCallingNext();
+    }
+    for (Operator operator : deviceOperators) {
+      maxPeekMemory = Math.max(maxPeekMemory, operator.calculateMaxPeekMemory());
+    }
+    return Math.max(maxPeekMemory, calculateMaxReturnSize());
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // time + all value columns
+    return (1L + dataTypes.size()) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+    for (Operator child : deviceOperators) {
+      long maxReturnSize = child.calculateMaxReturnSize();
+      currentRetainedSize += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+      minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+    }
+    // max cached TsBlock
+    return currentRetainedSize - minChildReturnSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index 127a26a8b6..728da7e7db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
@@ -152,4 +153,31 @@ public class DeviceViewOperator implements ProcessOperator {
   public boolean isFinished() {
     return !this.hasNext();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = calculateMaxReturnSize() + calculateRetainedSizeAfterCallingNext();
+    for (Operator child : deviceOperators) {
+      maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+    }
+    return maxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // null columns would be filled, so return size equals to
+    // (numberOfValueColumns(dataTypes.size() - 1) + 1(timeColumn)) * columnSize + deviceColumnSize
+    // size of device name column is ignored
+    return (long) (dataTypes.size())
+        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long sum = 0;
+    for (Operator operator : deviceOperators) {
+      sum += operator.calculateRetainedSizeAfterCallingNext();
+    }
+    return sum;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index da7ffe6ab1..c182168bd3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -91,4 +91,23 @@ public class FillOperator implements ProcessOperator {
   public boolean isFinished() {
     return child.isFinished();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // while doing constant and previous fill, we may need to copy the corresponding column if there
+    // exists null values
+    // so the max peek memory may be double
+    return 2 * child.calculateMaxPeekMemory() + child.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    // we can safely ignore one line cached in IFill
+    return child.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
index 60dc07b2f2..80b75a40e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
@@ -22,7 +22,14 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.BinaryColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.multi.MappableUDFColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ternary.TernaryColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.unary.UnaryColumnTransformer;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -199,4 +206,105 @@ public class FilterAndProjectOperator implements ProcessOperator {
   public ListenableFuture<?> isBlocked() {
     return inputOperator.isBlocked();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = inputOperator.calculateMaxReturnSize();
+    int maxCachedColumn = 0;
+    // Only do projection, calculate max cached column size of calc tree
+    if (!hasFilter) {
+      for (int i = 0; i < projectOutputTransformerList.size(); i++) {
+        ColumnTransformer c = projectOutputTransformerList.get(i);
+        maxCachedColumn = Math.max(maxCachedColumn, 1 + i + getMaxLevelOfColumnTransformerTree(c));
+      }
+      return Math.max(
+              maxPeekMemory,
+              (long) maxCachedColumn
+                  * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte())
+          + inputOperator.calculateRetainedSizeAfterCallingNext();
+    }
+
+    // has Filter
+    maxCachedColumn =
+        Math.max(
+            1 + getMaxLevelOfColumnTransformerTree(filterOutputTransformer),
+            1 + commonTransformerList.size());
+    if (!hasNonMappableUDF) {
+      for (int i = 0; i < projectOutputTransformerList.size(); i++) {
+        ColumnTransformer c = projectOutputTransformerList.get(i);
+        maxCachedColumn = Math.max(maxCachedColumn, 1 + i + getMaxLevelOfColumnTransformerTree(c));
+      }
+    }
+    return Math.max(
+            maxPeekMemory,
+            (long) maxCachedColumn * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte())
+        + inputOperator.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // time + all value columns
+    if (!hasFilter || !hasNonMappableUDF) {
+      return (long) (1 + projectOutputTransformerList.size())
+          * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    } else {
+      return (long) (1 + filterTsBlockBuilder.getValueColumnBuilders().length)
+          * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    }
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return inputOperator.calculateRetainedSizeAfterCallingNext();
+  }
+
+  private int getMaxLevelOfColumnTransformerTree(ColumnTransformer columnTransformer) {
+    if (columnTransformer instanceof LeafColumnTransformer) {
+      // Time column is always calculated, we ignore it here. Constant column is ignored.
+      if (columnTransformer instanceof ConstantColumnTransformer
+          || columnTransformer instanceof TimeColumnTransformer) {
+        return 0;
+      } else {
+        return 1;
+      }
+    } else if (columnTransformer instanceof UnaryColumnTransformer) {
+      return Math.max(
+          2,
+          getMaxLevelOfColumnTransformerTree(
+              ((UnaryColumnTransformer) columnTransformer).getChildColumnTransformer()));
+    } else if (columnTransformer instanceof BinaryColumnTransformer) {
+      int childMaxLevel =
+          Math.max(
+              getMaxLevelOfColumnTransformerTree(
+                  ((BinaryColumnTransformer) columnTransformer).getLeftTransformer()),
+              getMaxLevelOfColumnTransformerTree(
+                  ((BinaryColumnTransformer) columnTransformer).getRightTransformer()));
+      return Math.max(3, childMaxLevel);
+    } else if (columnTransformer instanceof TernaryColumnTransformer) {
+      int childMaxLevel =
+          Math.max(
+              getMaxLevelOfColumnTransformerTree(
+                  ((TernaryColumnTransformer) columnTransformer).getFirstColumnTransformer()),
+              Math.max(
+                  getMaxLevelOfColumnTransformerTree(
+                      ((TernaryColumnTransformer) columnTransformer).getSecondColumnTransformer()),
+                  getMaxLevelOfColumnTransformerTree(
+                      ((TernaryColumnTransformer) columnTransformer).getThirdColumnTransformer())));
+      return Math.max(4, childMaxLevel);
+    } else if (columnTransformer instanceof MappableUDFColumnTransformer) {
+      int childMaxLevel = 0;
+      for (ColumnTransformer c :
+          ((MappableUDFColumnTransformer) columnTransformer).getInputColumnTransformers()) {
+        childMaxLevel = Math.max(childMaxLevel, getMaxLevelOfColumnTransformerTree(c));
+      }
+      return Math.max(
+          1
+              + ((MappableUDFColumnTransformer) columnTransformer)
+                  .getInputColumnTransformers()
+                  .length,
+          childMaxLevel);
+    } else {
+      throw new UnsupportedOperationException("Unsupported ColumnTransformer");
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index ef3870fb17..726f5e7ecf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -80,4 +80,19 @@ public class LimitOperator implements ProcessOperator {
   public boolean isFinished() {
     return remainingLimit == 0 || child.isFinished();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return child.calculateRetainedSizeAfterCallingNext();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index fcae23ce7c..bcbb92a932 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -160,6 +160,26 @@ public class LinearFillOperator implements ProcessOperator {
     return cachedTsBlock.isEmpty() && child.isFinished();
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    // while doing linear fill, we may need to copy the corresponding column if there exists null
+    // values, and we may also need to cache next TsBlock to get next not null value
+    // so the max peek memory may be triple or more, here we just use 3 as the estimated factor
+    // because in most cases, we will get next not null value in next TsBlock
+    return 3 * child.calculateMaxPeekMemory() + child.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    // we can safely ignore two lines cached in LinearFill
+    return child.calculateRetainedSizeAfterCallingNext();
+  }
+
   /**
    * Judge whether we can use current cached TsBlock to fill Column
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index 2820992745..572738d081 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -79,4 +79,19 @@ public class OffsetOperator implements ProcessOperator {
   public boolean isFinished() {
     return child.isFinished();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return child.calculateRetainedSizeAfterCallingNext();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index 88fb5fe20a..d95968850d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -20,9 +20,9 @@
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -45,10 +45,11 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
   public RawDataAggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       Operator child,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter) {
-    super(operatorContext, aggregators, child, ascending, groupByTimeParameter, true);
+      long maxReturnSize) {
+    super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index c59de13988..8632041ee9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -37,7 +36,6 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 
 public abstract class SingleInputAggregationOperator implements ProcessOperator {
 
@@ -57,26 +55,30 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
   // using for building result tsBlock
   protected final TsBlockBuilder resultTsBlockBuilder;
 
+  protected final long maxRetainedSize;
+  protected final long maxReturnSize;
+
   public SingleInputAggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
       Operator child,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter,
-      boolean outputPartialTimeWindow) {
+      ITimeRangeIterator timeRangeIterator,
+      long maxReturnSize) {
     this.operatorContext = operatorContext;
     this.ascending = ascending;
     this.child = child;
     this.aggregators = aggregators;
-
-    this.timeRangeIterator =
-        initTimeRangeIterator(groupByTimeParameter, ascending, outputPartialTimeWindow);
+    this.timeRangeIterator = timeRangeIterator;
 
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+    this.maxRetainedSize = child.calculateMaxReturnSize();
+    this.maxReturnSize = maxReturnSize;
   }
 
   @Override
@@ -101,7 +103,6 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
     long start = System.nanoTime();
 
     // reset operator state
-    resultTsBlockBuilder.reset();
     canCallNext = true;
 
     while (System.nanoTime() - start < maxRuntime
@@ -124,7 +125,9 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
     }
 
     if (resultTsBlockBuilder.getPositionCount() > 0) {
-      return resultTsBlockBuilder.build();
+      TsBlock resultTsBlock = resultTsBlockBuilder.build();
+      resultTsBlockBuilder.reset();
+      return resultTsBlock;
     } else {
       return null;
     }
@@ -146,4 +149,19 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
     curTimeRange = null;
     appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator);
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxReturnSize + maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 10303016c5..d73491d3e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -40,10 +40,12 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
   public SlidingWindowAggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       Operator child,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter) {
-    super(operatorContext, aggregators, child, ascending, groupByTimeParameter, false);
+      GroupByTimeParameter groupByTimeParameter,
+      long maxReturnSize) {
+    super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize);
     checkArgument(
         groupByTimeParameter != null,
         "GroupByTimeParameter cannot be null in SlidingWindowAggregationOperator");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
index 38bda24bf7..5fa693b2a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
@@ -54,4 +54,19 @@ public class SortOperator implements ProcessOperator {
   public boolean isFinished() {
     return false;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return 0;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return 0;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index 0e08b682f1..13f97fe8c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.mpp.transformation.dag.input.QueryDataSetInputLayer;
 import org.apache.iotdb.db.mpp.transformation.dag.input.TsBlockInputDataSet;
 import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -350,4 +351,27 @@ public class TransformOperator implements ProcessOperator {
   public OperatorContext getOperatorContext() {
     return operatorContext;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // here we use maximum estimated memory usage
+    return (long)
+        (udfCollectorMemoryBudgetInMB
+            + udfTransformerMemoryBudgetInMB
+            + inputOperator.calculateMaxReturnSize());
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // time + all value columns
+    return (long) (1 + transformers.length)
+        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    // Collector may cache points, here we use maximum usage
+    return (long)
+        (inputOperator.calculateRetainedSizeAfterCallingNext() + udfCollectorMemoryBudgetInMB);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index f026ef35c4..695893d400 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerge
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -244,6 +245,40 @@ public class RowBasedTimeJoinOperator implements ProcessOperator {
     return finished;
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = 0;
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      childrenMaxPeekMemory =
+          Math.max(childrenMaxPeekMemory, maxPeekMemory + child.calculateMaxPeekMemory());
+      maxPeekMemory +=
+          (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
+    }
+
+    maxPeekMemory += calculateMaxReturnSize();
+    return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // time + all value columns
+    return (1L + outputColumnCount)
+        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+    for (Operator child : children) {
+      long maxReturnSize = child.calculateMaxReturnSize();
+      currentRetainedSize += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+      minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+    }
+    // max cached TsBlock
+    return currentRetainedSize - minChildReturnSize;
+  }
+
   private void updateTimeSelector(int index) {
     timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index]));
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
index eb185e09ff..3170838986 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerge
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -243,6 +244,40 @@ public class TimeJoinOperator implements ProcessOperator {
     return finished;
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = 0;
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      childrenMaxPeekMemory =
+          Math.max(childrenMaxPeekMemory, maxPeekMemory + child.calculateMaxPeekMemory());
+      maxPeekMemory +=
+          (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
+    }
+
+    maxPeekMemory += calculateMaxReturnSize();
+    return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // time + all value columns
+    return (1L + outputColumnCount)
+        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+    for (Operator child : children) {
+      long maxReturnSize = child.calculateMaxReturnSize();
+      currentRetainedSize += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+      minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+    }
+    // max cached TsBlock
+    return currentRetainedSize - minChildReturnSize;
+  }
+
   /**
    * If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
    * return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
index 7f011c32cf..b91e346372 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
@@ -85,4 +85,32 @@ public class LastQueryCollectOperator implements ProcessOperator {
   public boolean isFinished() {
     return !hasNext();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = 0;
+    for (Operator child : children) {
+      maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+      maxPeekMemory = Math.max(maxPeekMemory, child.calculateRetainedSizeAfterCallingNext());
+    }
+    return maxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long maxReturnMemory = 0;
+    for (Operator child : children) {
+      maxReturnMemory = Math.max(maxReturnMemory, child.calculateMaxReturnSize());
+    }
+    return maxReturnMemory;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long sum = 0;
+    for (Operator operator : children) {
+      sum += operator.calculateRetainedSizeAfterCallingNext();
+    }
+    return sum;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index 99deebfd59..5ad04bdc2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.db.mpp.execution.operator.process.last;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.openjdk.jol.info.ClassLayout;
 
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -40,6 +42,8 @@ import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryU
 // time-series
 public class LastQueryMergeOperator implements ProcessOperator {
 
+  public static final long MAP_NODE_RETRAINED_SIZE = 16L + Location.INSTANCE_SIZE;
+
   private final OperatorContext operatorContext;
 
   private final List<Operator> children;
@@ -219,6 +223,47 @@ public class LastQueryMergeOperator implements ProcessOperator {
     return finished;
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = 0;
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      childrenMaxPeekMemory =
+          Math.max(childrenMaxPeekMemory, maxPeekMemory + child.calculateMaxPeekMemory());
+      maxPeekMemory +=
+          (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
+    }
+    // result size + cached TreeMap size
+    maxPeekMemory +=
+        (calculateMaxReturnSize()
+            + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+                * MAP_NODE_RETRAINED_SIZE);
+    return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long maxReturnSize = 0;
+    for (Operator child : children) {
+      maxReturnSize = Math.max(maxReturnSize, child.calculateMaxReturnSize());
+    }
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long childrenSum = 0, minChildReturnSize = Long.MAX_VALUE;
+    for (Operator child : children) {
+      long maxReturnSize = child.calculateMaxReturnSize();
+      childrenSum += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+      minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+    }
+    // max cached TsBlock + cached TreeMap size
+    return (childrenSum - minChildReturnSize)
+        + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+            * MAP_NODE_RETRAINED_SIZE;
+  }
+
   /**
    * If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
    * return false;
@@ -241,6 +286,8 @@ public class LastQueryMergeOperator implements ProcessOperator {
   }
 
   private static class Location {
+
+    private static final int INSTANCE_SIZE = ClassLayout.parseClass(Location.class).instanceSize();
     int tsBlockIndex;
     int rowIndex;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index 57d391d5df..60620b1f56 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.successfulAsList;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 // collect all last query result in the same data region and there is no order guarantee
 public class LastQueryOperator implements ProcessOperator {
@@ -140,4 +141,29 @@ public class LastQueryOperator implements ProcessOperator {
   private int getEndIndex() {
     return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory =
+        Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, tsBlockBuilder.getRetainedSizeInBytes());
+    long res = 0;
+    for (Operator child : children) {
+      res = Math.max(res, maxPeekMemory + child.calculateMaxPeekMemory());
+    }
+    return res;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, tsBlockBuilder.getRetainedSizeInBytes());
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long sum = 0;
+    for (Operator operator : children) {
+      sum += operator.calculateRetainedSizeAfterCallingNext();
+    }
+    return sum;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index 2945856933..082173c29f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.successfulAsList;
 import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil.compareTimeSeries;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 // collect all last query result in the same data region and sort them according to the
 // time-series's alphabetical order
@@ -190,6 +191,32 @@ public class LastQuerySortOperator implements ProcessOperator {
     return !hasNext();
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + cachedTsBlock.getRetainedSizeInBytes();
+    long res = 0;
+    for (Operator child : children) {
+      res = Math.max(res, maxPeekMemory + child.calculateMaxPeekMemory());
+    }
+    return res;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long childrenMaxReturnSize = 0;
+    long childrenSumRetainedSize = 0;
+    for (Operator child : children) {
+      childrenMaxReturnSize = Math.max(childrenMaxReturnSize, child.calculateMaxReturnSize());
+      childrenSumRetainedSize += child.calculateRetainedSizeAfterCallingNext();
+    }
+    return cachedTsBlock.getRetainedSizeInBytes() + childrenMaxReturnSize + childrenSumRetainedSize;
+  }
+
   private int getEndIndex() {
     return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 93a5d81cc3..44e423424e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -131,4 +131,19 @@ public class UpdateLastCacheOperator implements ProcessOperator {
   public void close() throws Exception {
     child.close();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return child.calculateRetainedSizeAfterCallingNext();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
index a9176d658c..7524b7c455 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
@@ -133,4 +133,33 @@ public class CountMergeOperator implements ProcessOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+    }
+
+    return childrenMaxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long childrenMaxReturnSize = 0;
+    for (Operator child : children) {
+      childrenMaxReturnSize = Math.max(childrenMaxReturnSize, child.calculateMaxReturnSize());
+    }
+
+    return childrenMaxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long retainedSize = 0L;
+    for (Operator child : children) {
+      retainedSize += child.calculateRetainedSizeAfterCallingNext();
+    }
+    return retainedSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
index ca69fef36f..3e9bd02c02 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
@@ -97,4 +97,21 @@ public class DevicesCountOperator implements SourceOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // the integer used for count
+    return 4L;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // the integer used for count
+    return 4L;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
index 1630af74a2..aa5d865014 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
@@ -36,6 +36,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public class LevelTimeSeriesCountOperator implements SourceOperator {
   private final PlanNodeId sourceId;
   private final OperatorContext operatorContext;
@@ -124,4 +126,19 @@ public class LevelTimeSeriesCountOperator implements SourceOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
index a4510ed53c..781ba46695 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
@@ -40,6 +40,7 @@ import java.util.TreeSet;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class NodeManageMemoryMergeOperator implements ProcessOperator {
   private final OperatorContext operatorContext;
@@ -137,4 +138,21 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
   public boolean isFinished() {
     return !isReadingMemory && child.isFinished();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // todo calculate the result based on all the scan node; currently, this is shadowed by
+    // schemaQueryMergeNode
+    return Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxPeekMemory());
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize());
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + child.calculateRetainedSizeAfterCallingNext();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
index 76ec17b585..85bed1cf20 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
@@ -107,4 +107,19 @@ public class NodePathsConvertOperator implements ProcessOperator {
   public boolean isFinished() {
     return child.isFinished();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return child.calculateMaxReturnSize() + child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return child.calculateRetainedSizeAfterCallingNext();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
index 79b3af2c41..6690848db1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
@@ -36,6 +36,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class NodePathsCountOperator implements ProcessOperator {
 
@@ -107,4 +108,21 @@ public class NodePathsCountOperator implements ProcessOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // todo calculate the result based on all the scan node; currently, this is shadowed by
+    // schemaQueryMergeNode
+    return Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxPeekMemory());
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize());
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + child.calculateRetainedSizeAfterCallingNext();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
index 1daade8dc3..6d1eea794e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
@@ -38,6 +38,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public class NodePathsSchemaScanOperator implements SourceOperator {
   private final PlanNodeId sourceId;
 
@@ -128,4 +130,19 @@ public class NodePathsSchemaScanOperator implements SourceOperator {
   public PlanNodeId getSourceId() {
     return sourceId;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
index 92b023eabb..96ff8cbc11 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
@@ -122,4 +122,33 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
         new BinaryColumn(
             1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())}));
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+    }
+
+    return childrenMaxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long childrenMaxReturnSize = 0;
+    for (Operator child : children) {
+      childrenMaxReturnSize = Math.max(childrenMaxReturnSize, child.calculateMaxReturnSize());
+    }
+
+    return childrenMaxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long retainedSize = 0L;
+    for (Operator child : children) {
+      retainedSize += child.calculateRetainedSizeAfterCallingNext();
+    }
+    return retainedSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
index 3a22065879..4e07c4178c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
@@ -44,6 +44,8 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public class SchemaFetchScanOperator implements SourceOperator {
 
   private static final Logger logger = LoggerFactory.getLogger(SchemaFetchScanOperator.class);
@@ -55,7 +57,6 @@ public class SchemaFetchScanOperator implements SourceOperator {
 
   private final ISchemaRegion schemaRegion;
 
-  private TsBlock tsBlock;
   private boolean isFinished = false;
 
   public SchemaFetchScanOperator(
@@ -83,12 +84,11 @@ public class SchemaFetchScanOperator implements SourceOperator {
     }
     isFinished = true;
     try {
-      fetchSchema();
+      return fetchSchema();
     } catch (MetadataException e) {
       logger.error("Error occurred during execute SchemaFetchOperator {}", sourceId, e);
       throw new RuntimeException(e);
     }
-    return tsBlock;
   }
 
   @Override
@@ -106,7 +106,7 @@ public class SchemaFetchScanOperator implements SourceOperator {
     return sourceId;
   }
 
-  private void fetchSchema() throws MetadataException {
+  private TsBlock fetchSchema() throws MetadataException {
     ClusterSchemaTree schemaTree = new ClusterSchemaTree();
     List<PartialPath> partialPathList = patternTree.getAllPathPatterns();
     for (PartialPath path : partialPathList) {
@@ -122,10 +122,24 @@ public class SchemaFetchScanOperator implements SourceOperator {
     } catch (IOException e) {
       // Totally memory operation. This case won't happen.
     }
-    this.tsBlock =
-        new TsBlock(
-            new TimeColumn(1, new long[] {0}),
-            new BinaryColumn(
-                1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())}));
+    return new TsBlock(
+        new TimeColumn(1, new long[] {0}),
+        new BinaryColumn(
+            1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())}));
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
index 519cdce27e..f2671034e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
@@ -80,4 +80,33 @@ public class SchemaQueryMergeOperator implements ProcessOperator {
       child.close();
     }
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+    }
+
+    return childrenMaxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long childrenMaxReturnSize = 0;
+    for (Operator child : children) {
+      childrenMaxReturnSize = Math.max(childrenMaxReturnSize, child.calculateMaxReturnSize());
+    }
+
+    return childrenMaxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long retainedSize = 0L;
+    for (Operator child : children) {
+      retainedSize += child.calculateRetainedSizeAfterCallingNext();
+    }
+    return retainedSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
index f52cbf62a4..eb3c1f4ec2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
@@ -186,4 +186,44 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = 0;
+
+    for (Operator child : operators) {
+      maxPeekMemory += child.calculateMaxReturnSize();
+    }
+
+    for (Operator child : operators) {
+      maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+    }
+
+    return maxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    long maxReturnSize = 0;
+
+    for (Operator child : operators) {
+      maxReturnSize += child.calculateMaxReturnSize();
+    }
+
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long retainedSize = 0L;
+
+    for (Operator child : operators) {
+      retainedSize += child.calculateMaxReturnSize();
+    }
+
+    for (Operator child : operators) {
+      retainedSize += child.calculateRetainedSizeAfterCallingNext();
+    }
+    return retainedSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
index b318c5e7db..4493dc58a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
@@ -24,11 +24,13 @@ import org.apache.iotdb.db.mpp.execution.operator.source.SourceOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public abstract class SchemaQueryScanOperator implements SourceOperator {
 
   protected OperatorContext operatorContext;
   protected TsBlock tsBlock;
-  private boolean hasCachedTsBlock;
+  protected boolean isFinished = false;
 
   protected int limit;
   protected int offset;
@@ -85,19 +87,25 @@ public abstract class SchemaQueryScanOperator implements SourceOperator {
 
   @Override
   public TsBlock next() {
-    hasCachedTsBlock = false;
-    return tsBlock;
+    isFinished = true;
+    TsBlock result = tsBlock;
+    tsBlock = null;
+    return result;
   }
 
   @Override
   public boolean hasNext() {
+    if (isFinished) {
+      return false;
+    }
     if (tsBlock == null) {
       tsBlock = createTsBlock();
-      if (tsBlock.getPositionCount() > 0) {
-        hasCachedTsBlock = true;
+      if (tsBlock.getPositionCount() == 0) {
+        isFinished = true;
+        return false;
       }
     }
-    return hasCachedTsBlock;
+    return true;
   }
 
   @Override
@@ -109,4 +117,19 @@ public abstract class SchemaQueryScanOperator implements SourceOperator {
   public PlanNodeId getSourceId() {
     return sourceId;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
index c87f2ee765..299961a561 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
@@ -113,4 +113,21 @@ public class TimeSeriesCountOperator implements SourceOperator {
   public boolean isFinished() {
     return isFinished;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // the integer used for count
+    return 4L;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // the integer used for count
+    return 4L;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 2d7d671be9..42ec93cfca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
@@ -40,7 +41,6 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
 
 public abstract class AbstractSeriesAggregationScanOperator implements DataSourceOperator {
@@ -68,14 +68,19 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
 
   protected boolean finished = false;
 
+  private final long maxRetainedSize;
+  private final long maxReturnSize;
+
   public AbstractSeriesAggregationScanOperator(
       PlanNodeId sourceId,
       OperatorContext context,
       SeriesScanUtil seriesScanUtil,
       int subSensorSize,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter) {
+      GroupByTimeParameter groupByTimeParameter,
+      long maxReturnSize) {
     this.sourceId = sourceId;
     this.operatorContext = context;
     this.ascending = ascending;
@@ -83,14 +88,17 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
     this.seriesScanUtil = seriesScanUtil;
     this.subSensorSize = subSensorSize;
     this.aggregators = aggregators;
-
-    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true);
+    this.timeRangeIterator = timeRangeIterator;
 
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+    this.maxRetainedSize =
+        (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    this.maxReturnSize = maxReturnSize;
   }
 
   @Override
@@ -108,6 +116,21 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
     seriesScanUtil.initQueryDataSource(dataSource);
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxRetainedSize + maxReturnSize;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return maxRetainedSize;
+  }
+
   @Override
   public boolean hasNext() {
     return timeRangeIterator.hasNextTimeRange();
@@ -119,9 +142,6 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
     long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
     long start = System.nanoTime();
 
-    // reset operator state
-    resultTsBlockBuilder.reset();
-
     while (System.nanoTime() - start < maxRuntime
         && timeRangeIterator.hasNextTimeRange()
         && !resultTsBlockBuilder.isFull()) {
@@ -138,7 +158,9 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
     }
 
     if (resultTsBlockBuilder.getPositionCount() > 0) {
-      return resultTsBlockBuilder.build();
+      TsBlock resultTsBlock = resultTsBlockBuilder.build();
+      resultTsBlockBuilder.reset();
+      return resultTsBlock;
     } else {
       return null;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
index f9fa1f8d52..2ad3eb1a83 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -37,9 +38,11 @@ public class AlignedSeriesAggregationScanOperator extends AbstractSeriesAggregat
       AlignedPath seriesPath,
       OperatorContext context,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       Filter timeFilter,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter) {
+      GroupByTimeParameter groupByTimeParameter,
+      long maxReturnSize) {
     super(
         sourceId,
         context,
@@ -52,7 +55,9 @@ public class AlignedSeriesAggregationScanOperator extends AbstractSeriesAggregat
             ascending),
         seriesPath.getMeasurementList().size(),
         aggregators,
+        timeRangeIterator,
         ascending,
-        groupByTimeParameter);
+        groupByTimeParameter,
+        maxReturnSize);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
index c47ab9f95d..c64947e4b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
@@ -37,6 +38,8 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
   private boolean hasCachedTsBlock = false;
   private boolean finished = false;
 
+  private final long maxReturnSize;
+
   public AlignedSeriesScanOperator(
       PlanNodeId sourceId,
       AlignedPath seriesPath,
@@ -54,6 +57,10 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
             timeFilter,
             valueFilter,
             ascending);
+    // time + all value columns
+    this.maxReturnSize =
+        (1L + seriesPath.getMeasurementList().size())
+            * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
   }
 
   @Override
@@ -65,7 +72,9 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
   public TsBlock next() {
     if (hasCachedTsBlock || hasNext()) {
       hasCachedTsBlock = false;
-      return tsBlock;
+      TsBlock res = tsBlock;
+      tsBlock = null;
+      return res;
     }
     throw new IllegalStateException("no next batch");
   }
@@ -114,6 +123,21 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
     return finished || (finished = !hasNext());
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
+
   private boolean readChunkData() throws IOException {
     while (seriesScanUtil.hasNextChunk()) {
       if (readPageData()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index c72063caeb..110079809b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public class ExchangeOperator implements SourceOperator {
 
   private final OperatorContext operatorContext;
@@ -62,6 +64,21 @@ public class ExchangeOperator implements SourceOperator {
     return sourceHandle.isFinished();
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
+
   @Override
   public PlanNodeId getSourceId() {
     return sourceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
index dfb6d82c5c..974758f8a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
@@ -57,6 +57,21 @@ public class LastCacheScanOperator implements SourceOperator {
     return !hasNext();
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return tsBlock.getRetainedSizeInBytes();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return tsBlock.getRetainedSizeInBytes();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
+
   @Override
   public PlanNodeId getSourceId() {
     return sourceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
index 157f51f2fa..99b24e06a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -44,9 +45,11 @@ public class SeriesAggregationScanOperator extends AbstractSeriesAggregationScan
       Set<String> allSensors,
       OperatorContext context,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       Filter timeFilter,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter) {
+      GroupByTimeParameter groupByTimeParameter,
+      long maxReturnSize) {
     super(
         sourceId,
         context,
@@ -60,7 +63,9 @@ public class SeriesAggregationScanOperator extends AbstractSeriesAggregationScan
             ascending),
         1,
         aggregators,
+        timeRangeIterator,
         ascending,
-        groupByTimeParameter);
+        groupByTimeParameter,
+        maxReturnSize);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index f74d14a2f6..05685f758d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -38,6 +39,8 @@ public class SeriesScanOperator implements DataSourceOperator {
   private boolean hasCachedTsBlock = false;
   private boolean finished = false;
 
+  private final long maxReturnSize;
+
   public SeriesScanOperator(
       PlanNodeId sourceId,
       PartialPath seriesPath,
@@ -58,6 +61,7 @@ public class SeriesScanOperator implements DataSourceOperator {
             timeFilter,
             valueFilter,
             ascending);
+    this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
   }
 
   @Override
@@ -69,7 +73,9 @@ public class SeriesScanOperator implements DataSourceOperator {
   public TsBlock next() {
     if (hasCachedTsBlock || hasNext()) {
       hasCachedTsBlock = false;
-      return tsBlock;
+      TsBlock res = tsBlock;
+      tsBlock = null;
+      return res;
     }
     throw new IllegalStateException("no next batch");
   }
@@ -118,6 +124,21 @@ public class SeriesScanOperator implements DataSourceOperator {
     return finished || (finished = !hasNext());
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
+
   private boolean readChunkData() throws IOException {
     while (seriesScanUtil.hasNextChunk()) {
       if (readPageData()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index af6b72e47a..22fd2cbabc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -618,7 +618,9 @@ public class SeriesScanUtil {
 
     if (hasCachedNextOverlappedPage) {
       hasCachedNextOverlappedPage = false;
-      return cachedTsBlock;
+      TsBlock res = cachedTsBlock;
+      cachedTsBlock = null;
+      return res;
     } else {
 
       /*
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
index 4f8da81f15..ce6dfa03da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
@@ -84,8 +86,9 @@ public class GroupByLevelController {
     PartialPath rawPath = ((TimeSeriesOperand) expression.getExpressions().get(0)).getPath();
     PartialPath groupedPath = generatePartialPathByLevel(isCountStar, rawPath, levels);
 
-    checkDatatypeConsistency(
-        groupedPath.getFullPath(), ((FunctionExpression) expression).getFunctionName(), rawPath);
+    String functionName = ((FunctionExpression) expression).getFunctionName();
+    checkDatatypeConsistency(groupedPath.getFullPath(), functionName, rawPath);
+    updateTypeProvider(functionName, groupedPath.getFullPath(), rawPath);
 
     Expression rawPathExpression = new TimeSeriesOperand(rawPath);
     Expression groupedPathExpression = new TimeSeriesOperand(groupedPath);
@@ -122,9 +125,7 @@ public class GroupByLevelController {
       case SQLConstant.COUNT:
       case SQLConstant.AVG:
       case SQLConstant.SUM:
-        try {
-          typeProvider.getType(groupedPath);
-        } catch (StatementAnalyzeException e) {
+        if (!typeProvider.containsTypeInfoOf(groupedPath)) {
           typeProvider.setType(groupedPath, rawPath.getSeriesType());
         }
         return;
@@ -133,7 +134,9 @@ public class GroupByLevelController {
       case SQLConstant.FIRST_VALUE:
       case SQLConstant.MAX_VALUE:
       case SQLConstant.EXTREME:
-        try {
+        if (!typeProvider.containsTypeInfoOf(groupedPath)) {
+          typeProvider.setType(groupedPath, rawPath.getSeriesType());
+        } else {
           TSDataType tsDataType = typeProvider.getType(groupedPath);
           if (tsDataType != rawPath.getSeriesType()) {
             throw new SemanticException(
@@ -141,8 +144,6 @@ public class GroupByLevelController {
                     "GROUP BY LEVEL: the data types of the same output column[%s] should be the same.",
                     groupedPath));
           }
-        } catch (StatementAnalyzeException e) {
-          typeProvider.setType(groupedPath, rawPath.getSeriesType());
         }
         return;
       default:
@@ -224,4 +225,15 @@ public class GroupByLevelController {
   public Map<Expression, Expression> getRawPathToGroupedPathMap() {
     return rawPathToGroupedPathMap;
   }
+
+  private void updateTypeProvider(String functionName, String groupedPath, PartialPath rawPath) {
+    List<AggregationType> splitAggregations =
+        SchemaUtils.splitPartialAggregation(AggregationType.valueOf(functionName.toUpperCase()));
+    for (AggregationType aggregationType : splitAggregations) {
+      String splitFunctionName = aggregationType.toString().toLowerCase();
+      typeProvider.setType(
+          String.format("%s(%s)", splitFunctionName, groupedPath),
+          SchemaUtils.getSeriesTypeByPath(rawPath, splitFunctionName));
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/TypeProvider.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/TypeProvider.java
index b196d04a2b..6314102a5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/TypeProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/TypeProvider.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.mpp.plan.analyze;
 
-import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -30,6 +29,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
+import static com.google.common.base.Preconditions.checkState;
+
 public class TypeProvider {
 
   private final Map<String, TSDataType> typeMap;
@@ -43,17 +44,14 @@ public class TypeProvider {
   }
 
   public TSDataType getType(String path) {
-    if (!typeMap.containsKey(path)) {
-      throw new StatementAnalyzeException(String.format("no data type found for path: %s", path));
-    }
+    checkState(typeMap.containsKey(path), String.format("no data type found for path: %s", path));
     return typeMap.get(path);
   }
 
   public void setType(String path, TSDataType dataType) {
-    if (typeMap.containsKey(path) && typeMap.get(path) != dataType) {
-      throw new StatementAnalyzeException(
-          String.format("inconsistent data type for path: %s", path));
-    }
+    checkState(
+        !typeMap.containsKey(path) || typeMap.get(path) == dataType,
+        String.format("inconsistent data type for path: %s", path));
     this.typeMap.put(path, dataType);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 8188612e61..f7ba31d0d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -18,19 +18,27 @@
  */
 package org.apache.iotdb.db.mpp.plan.planner;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.mpp.exception.MemoryNotEnoughException;
 import org.apache.iotdb.db.mpp.execution.driver.DataDriver;
 import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.timer.ITimeSliceAllocator;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
+import io.airlift.concurrent.SetThreadName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Used to plan a fragment instance. Currently, we simply change it from PlanNode to executable
  * Operator tree, but in the future, we may split one fragment instance into multiple pipeline to
@@ -38,6 +46,12 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
  */
 public class LocalExecutionPlanner {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(LocalExecutionPlanner.class);
+
+  /** allocated memory for operator execution */
+  private long freeMemoryForOperators =
+      IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators();
+
   public static LocalExecutionPlanner getInstance() {
     return InstanceHolder.INSTANCE;
   }
@@ -47,11 +61,15 @@ public class LocalExecutionPlanner {
       TypeProvider types,
       FragmentInstanceContext instanceContext,
       Filter timeFilter,
-      DataRegion dataRegion) {
+      DataRegion dataRegion)
+      throws MemoryNotEnoughException {
     LocalExecutionPlanContext context = new LocalExecutionPlanContext(types, instanceContext);
 
     Operator root = plan.accept(new OperatorTreeGenerator(), context);
 
+    // check whether current free memory is enough to execute current query
+    checkMemory(root, instanceContext.getStateMachine());
+
     ITimeSliceAllocator timeSliceAllocator = context.getTimeSliceAllocator();
     instanceContext
         .getOperatorContexts()
@@ -71,7 +89,8 @@ public class LocalExecutionPlanner {
   }
 
   public SchemaDriver plan(
-      PlanNode plan, FragmentInstanceContext instanceContext, ISchemaRegion schemaRegion) {
+      PlanNode plan, FragmentInstanceContext instanceContext, ISchemaRegion schemaRegion)
+      throws MemoryNotEnoughException {
 
     SchemaDriverContext schemaDriverContext =
         new SchemaDriverContext(instanceContext, schemaRegion);
@@ -81,6 +100,9 @@ public class LocalExecutionPlanner {
 
     Operator root = plan.accept(new OperatorTreeGenerator(), context);
 
+    // check whether current free memory is enough to execute current query
+    checkMemory(root, instanceContext.getStateMachine());
+
     ITimeSliceAllocator timeSliceAllocator = context.getTimeSliceAllocator();
     instanceContext
         .getOperatorContexts()
@@ -91,6 +113,49 @@ public class LocalExecutionPlanner {
     return new SchemaDriver(root, context.getSinkHandle(), schemaDriverContext);
   }
 
+  private void checkMemory(Operator root, FragmentInstanceStateMachine stateMachine)
+      throws MemoryNotEnoughException {
+
+    // if it is disabled, just return
+    if (!IoTDBDescriptor.getInstance().getConfig().isEnableQueryMemoryEstimation()) {
+      return;
+    }
+
+    long estimatedMemorySize = root.calculateMaxPeekMemory();
+
+    synchronized (this) {
+      if (estimatedMemorySize > freeMemoryForOperators) {
+        throw new MemoryNotEnoughException(
+            String.format(
+                "There is not enough memory to execute current fragment instance, current remaining free memory is %d, estimated memory usage for current fragment instance is %d",
+                freeMemoryForOperators, estimatedMemorySize),
+            TSStatusCode.MEMORY_NOT_ENOUGH.getStatusCode());
+      } else {
+        freeMemoryForOperators -= estimatedMemorySize;
+        LOGGER.info(
+            String.format(
+                "consume memory: %d, current remaining memory: %d",
+                estimatedMemorySize, freeMemoryForOperators));
+      }
+    }
+
+    stateMachine.addStateChangeListener(
+        newState -> {
+          if (newState.isDone()) {
+            try (SetThreadName fragmentInstanceName =
+                new SetThreadName(stateMachine.getFragmentInstanceId().getFullId())) {
+              synchronized (this) {
+                this.freeMemoryForOperators += estimatedMemorySize;
+                LOGGER.info(
+                    String.format(
+                        "release memory: %d, current remaining memory: %d",
+                        estimatedMemorySize, freeMemoryForOperators));
+              }
+            }
+          }
+        });
+  }
+
   private static class InstanceHolder {
 
     private InstanceHolder() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 8ec8817486..589cc3d95e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -471,9 +471,8 @@ public class LogicalPlanBuilder {
         constructAggregationDescriptorList(aggregationExpressions, curStep);
     if (curStep.isOutputPartial()) {
       aggregationDescriptorList.forEach(
-          aggregationDescriptor -> {
-            updateTypeProviderByPartialAggregation(aggregationDescriptor, typeProvider);
-          });
+          aggregationDescriptor ->
+              updateTypeProviderByPartialAggregation(aggregationDescriptor, typeProvider));
     }
     this.root =
         new AggregationNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 214d024741..07a8e37051 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -27,12 +27,14 @@ import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
 import org.apache.iotdb.db.mpp.execution.exchange.ISourceHandle;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
+import org.apache.iotdb.db.mpp.execution.operator.AggregationUtil;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
@@ -150,6 +152,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn;
 import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
@@ -185,6 +188,9 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSize;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSizeForLastQuery;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
 
 /** This Visitor is responsible for transferring PlanNode Tree to Operator Tree */
@@ -272,6 +278,55 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return seriesScanOperator;
   }
 
+  @Override
+  public Operator visitSeriesAggregationScan(
+      SeriesAggregationScanNode node, LocalExecutionPlanContext context) {
+    PartialPath seriesPath = node.getSeriesPath();
+    boolean ascending = node.getScanOrder() == Ordering.ASC;
+    OperatorContext operatorContext =
+        context
+            .getInstanceContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                SeriesAggregationScanOperator.class.getSimpleName());
+
+    List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
+    List<Aggregator> aggregators = new ArrayList<>();
+    aggregationDescriptors.forEach(
+        o ->
+            aggregators.add(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(
+                        o.getAggregationType(), node.getSeriesPath().getSeriesType(), ascending),
+                    o.getStep())));
+
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(groupByTimeParameter, ascending, true);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            node.getAggregationDescriptorList(), timeRangeIterator, context.getTypeProvider());
+
+    SeriesAggregationScanOperator aggregateScanOperator =
+        new SeriesAggregationScanOperator(
+            node.getPlanNodeId(),
+            seriesPath,
+            context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()),
+            operatorContext,
+            aggregators,
+            timeRangeIterator,
+            node.getTimeFilter(),
+            ascending,
+            node.getGroupByTimeParameter(),
+            maxReturnSize);
+
+    context.addSourceOperator(aggregateScanOperator);
+    context.addPath(seriesPath);
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
+    return aggregateScanOperator;
+  }
+
   @Override
   public Operator visitAlignedSeriesAggregationScan(
       AlignedSeriesAggregationScanNode node, LocalExecutionPlanContext context) {
@@ -307,15 +362,24 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
               Collections.singletonList(new InputLocation[] {new InputLocation(0, seriesIndex)})));
     }
 
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(groupByTimeParameter, ascending, true);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            node.getAggregationDescriptorList(), timeRangeIterator, context.getTypeProvider());
+
     AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
         new AlignedSeriesAggregationScanOperator(
             node.getPlanNodeId(),
             seriesPath,
             operatorContext,
             aggregators,
+            timeRangeIterator,
             node.getTimeFilter(),
             ascending,
-            node.getGroupByTimeParameter());
+            groupByTimeParameter,
+            maxReturnSize);
 
     context.addSourceOperator(seriesAggregationScanOperator);
     context.addPath(seriesPath);
@@ -555,47 +619,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return new NodePathsCountOperator(operatorContext, child);
   }
 
-  @Override
-  public Operator visitSeriesAggregationScan(
-      SeriesAggregationScanNode node, LocalExecutionPlanContext context) {
-    PartialPath seriesPath = node.getSeriesPath();
-    boolean ascending = node.getScanOrder() == Ordering.ASC;
-    OperatorContext operatorContext =
-        context
-            .getInstanceContext()
-            .addOperatorContext(
-                context.getNextOperatorId(),
-                node.getPlanNodeId(),
-                SeriesAggregationScanOperator.class.getSimpleName());
-
-    List<Aggregator> aggregators = new ArrayList<>();
-    node.getAggregationDescriptorList()
-        .forEach(
-            o ->
-                aggregators.add(
-                    new Aggregator(
-                        AccumulatorFactory.createAccumulator(
-                            o.getAggregationType(),
-                            node.getSeriesPath().getSeriesType(),
-                            ascending),
-                        o.getStep())));
-    SeriesAggregationScanOperator aggregateScanOperator =
-        new SeriesAggregationScanOperator(
-            node.getPlanNodeId(),
-            seriesPath,
-            context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()),
-            operatorContext,
-            aggregators,
-            node.getTimeFilter(),
-            ascending,
-            node.getGroupByTimeParameter());
-
-    context.addSourceOperator(aggregateScanOperator);
-    context.addPath(seriesPath);
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
-    return aggregateScanOperator;
-  }
-
   @Override
   public Operator visitDeviceView(DeviceViewNode node, LocalExecutionPlanContext context) {
     OperatorContext operatorContext =
@@ -1016,7 +1039,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     boolean ascending = node.getScanOrder() == Ordering.ASC;
     List<Aggregator> aggregators = new ArrayList<>();
     Map<String, List<InputLocation>> layout = makeLayout(node);
-    for (GroupByLevelDescriptor descriptor : node.getGroupByLevelDescriptors()) {
+    List<GroupByLevelDescriptor> aggregationDescriptors = node.getGroupByLevelDescriptors();
+    for (GroupByLevelDescriptor descriptor : aggregationDescriptors) {
       List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
       TSDataType seriesDataType =
           context
@@ -1038,9 +1062,16 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                 node.getPlanNodeId(),
                 AggregationOperator.class.getSimpleName());
 
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(groupByTimeParameter, ascending, false);
+    long maxReturnSize =
+        calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
+
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
     return new AggregationOperator(
-        operatorContext, aggregators, children, ascending, node.getGroupByTimeParameter(), false);
+        operatorContext, aggregators, timeRangeIterator, children, maxReturnSize);
   }
 
   @Override
@@ -1060,7 +1091,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     boolean ascending = node.getScanOrder() == Ordering.ASC;
     List<Aggregator> aggregators = new ArrayList<>();
     Map<String, List<InputLocation>> layout = makeLayout(node);
-    for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) {
+    List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
+    for (AggregationDescriptor descriptor : aggregationDescriptors) {
       List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
       aggregators.add(
           SlidingWindowAggregatorFactory.createSlidingWindowAggregator(
@@ -1074,9 +1106,22 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
               descriptor.getStep()));
     }
 
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(groupByTimeParameter, ascending, false);
+    long maxReturnSize =
+        calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
+
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
     return new SlidingWindowAggregationOperator(
-        operatorContext, aggregators, child, ascending, node.getGroupByTimeParameter());
+        operatorContext,
+        aggregators,
+        timeRangeIterator,
+        child,
+        ascending,
+        groupByTimeParameter,
+        maxReturnSize);
   }
 
   @Override
@@ -1121,6 +1166,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     boolean ascending = node.getScanOrder() == Ordering.ASC;
     List<Aggregator> aggregators = new ArrayList<>();
     Map<String, List<InputLocation>> layout = makeLayout(node);
+    List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
     for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) {
       List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
       aggregators.add(
@@ -1136,6 +1182,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
               inputLocationList));
     }
     boolean inputRaw = node.getAggregationDescriptorList().get(0).getStep().isInputRaw();
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+
     if (inputRaw) {
       checkArgument(children.size() == 1, "rawDataAggregateOperator can only accept one input");
       OperatorContext operatorContext =
@@ -1146,8 +1194,20 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                   node.getPlanNodeId(),
                   RawDataAggregationOperator.class.getSimpleName());
       context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
+
+      ITimeRangeIterator timeRangeIterator =
+          initTimeRangeIterator(groupByTimeParameter, ascending, true);
+      long maxReturnSize =
+          calculateMaxAggregationResultSize(
+              aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
+
       return new RawDataAggregationOperator(
-          operatorContext, aggregators, children.get(0), ascending, node.getGroupByTimeParameter());
+          operatorContext,
+          aggregators,
+          timeRangeIterator,
+          children.get(0),
+          ascending,
+          maxReturnSize);
     } else {
       OperatorContext operatorContext =
           context
@@ -1156,9 +1216,16 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                   context.getNextOperatorId(),
                   node.getPlanNodeId(),
                   AggregationOperator.class.getSimpleName());
+
+      ITimeRangeIterator timeRangeIterator =
+          initTimeRangeIterator(groupByTimeParameter, ascending, true);
+      long maxReturnSize =
+          calculateMaxAggregationResultSize(
+              aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
+
       context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
       return new AggregationOperator(
-          operatorContext, aggregators, children, ascending, node.getGroupByTimeParameter(), true);
+          operatorContext, aggregators, timeRangeIterator, children, maxReturnSize);
     }
   }
 
@@ -1400,6 +1467,10 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
 
     // last_time, last_value
     List<Aggregator> aggregators = LastQueryUtil.createAggregators(seriesPath.getSeriesType());
+    ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(null, false, false);
+    long maxReturnSize =
+        calculateMaxAggregationResultSizeForLastQuery(
+            aggregators, seriesPath.transformToPartialPath());
 
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         new SeriesAggregationScanOperator(
@@ -1408,9 +1479,11 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
             context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()),
             operatorContext,
             aggregators,
+            timeRangeIterator,
             context.getLastQueryTimeFilter(),
             false,
-            null);
+            null,
+            maxReturnSize);
     context.addSourceOperator(seriesAggregationScanOperator);
     context.addPath(seriesPath);
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
@@ -1479,15 +1552,22 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     // last_time, last_value
     List<Aggregator> aggregators =
         LastQueryUtil.createAggregators(seriesPath.getSchemaList().get(0).getType());
+    ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(null, false, false);
+    long maxReturnSize =
+        calculateMaxAggregationResultSizeForLastQuery(
+            aggregators, seriesPath.transformToPartialPath());
+
     AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
         new AlignedSeriesAggregationScanOperator(
             node.getPlanNodeId(),
             seriesPath,
             operatorContext,
             aggregators,
+            timeRangeIterator,
             context.getLastQueryTimeFilter(),
             false,
-            null);
+            null,
+            maxReturnSize);
     context.addSourceOperator(seriesAggregationScanOperator);
     context.addPath(seriesPath);
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
index d0eff60394..44d5fc1c66 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
@@ -17,27 +17,30 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.execution.memory;
+package org.apache.iotdb.db.mpp.statistics;
 
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.commons.path.PartialPath;
 
-/**
- * Manages memory of a data node. The memory is divided into two memory pools so that the memory for
- * read and for write can be isolated.
- */
-public class LocalMemoryManager {
+import com.google.common.collect.Maps;
+
+import java.util.Map;
 
-  private final MemoryPool queryPool;
+public class StatisticsManager {
 
-  public LocalMemoryManager() {
-    queryPool =
-        new MemoryPool(
-            "query",
-            IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead(),
-            (long) (IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead() * 0.5));
+  private final Map<PartialPath, TimeseriesStats> seriesToStatsMap = Maps.newConcurrentMap();
+
+  public long getMaxBinarySizeInBytes(PartialPath path) {
+    return 512 * Byte.BYTES;
   }
 
-  public MemoryPool getQueryPool() {
-    return queryPool;
+  public static StatisticsManager getInstance() {
+    return StatisticsManager.StatisticsManagerHelper.INSTANCE;
+  }
+
+  private static class StatisticsManagerHelper {
+
+    private static final StatisticsManager INSTANCE = new StatisticsManager();
+
+    private StatisticsManagerHelper() {}
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
index d0eff60394..509341d3d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
@@ -17,27 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.execution.memory;
+package org.apache.iotdb.db.mpp.statistics;
 
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
-/**
- * Manages memory of a data node. The memory is divided into two memory pools so that the memory for
- * read and for write can be isolated.
- */
-public class LocalMemoryManager {
-
-  private final MemoryPool queryPool;
-
-  public LocalMemoryManager() {
-    queryPool =
-        new MemoryPool(
-            "query",
-            IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead(),
-            (long) (IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead() * 0.5));
-  }
-
-  public MemoryPool getQueryPool() {
-    return queryPool;
-  }
+public class TimeseriesStats {
+  // TODO collect time series statistics
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java
index 87cecc817a..d9693aa671 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java
@@ -88,4 +88,8 @@ public class MappableUDFColumnTransformer extends ColumnTransformer {
   protected void checkType() {
     // do nothing
   }
+
+  public ColumnTransformer[] getInputColumnTransformers() {
+    return inputColumnTransformers;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java
index 2032215b0f..096cef4d5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java
@@ -41,4 +41,16 @@ public abstract class TernaryColumnTransformer extends ColumnTransformer {
     this.thirdColumnTransformer = thirdColumnTransformer;
     checkType();
   }
+
+  public ColumnTransformer getFirstColumnTransformer() {
+    return firstColumnTransformer;
+  }
+
+  public ColumnTransformer getSecondColumnTransformer() {
+    return secondColumnTransformer;
+  }
+
+  public ColumnTransformer getThirdColumnTransformer() {
+    return thirdColumnTransformer;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java
index d84b538cc6..c0682169b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java
@@ -42,6 +42,10 @@ public abstract class UnaryColumnTransformer extends ColumnTransformer {
     initializeColumnCache(columnBuilder.build());
   }
 
+  public ColumnTransformer getChildColumnTransformer() {
+    return childColumnTransformer;
+  }
+
   @Override
   protected void checkType() {
     // do nothing
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java
index ae6ab41567..b20e84216e 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java
@@ -35,8 +35,7 @@ public class TsFileResourceManager {
   private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
 
   /** threshold total memory for all TimeIndex */
-  private double TIME_INDEX_MEMORY_THRESHOLD =
-      CONFIG.getAllocateMemoryForRead() * CONFIG.getTimeIndexMemoryProportion();
+  private double TIME_INDEX_MEMORY_THRESHOLD = CONFIG.getAllocateMemoryForTimeIndex();
 
   /** store the sealed TsFileResource, sorted by priority of TimeIndex */
   private final TreeSet<TsFileResource> sealedTsFileResources =
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
index 6233fa04ce..e25f592aa8 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
@@ -166,6 +166,11 @@ public class TimeSelector {
     return smallerChildIndex;
   }
 
+  public void clear() {
+    heapSize = 0;
+    lastTime = Long.MIN_VALUE;
+  }
+
   @Override
   public String toString() {
     return Arrays.toString(this.timeHeap);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
index 88e4b6bf16..1d3f3ccf79 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
@@ -295,6 +295,8 @@ public class TimeRangeIteratorTest {
   }
 
   private void checkRes(ITimeRangeIterator timeRangeIterator, String[] res) {
+    Assert.assertEquals(res.length, timeRangeIterator.getTotalIntervalNum());
+
     boolean isAscending = timeRangeIterator.isAscending();
     int cnt = isAscending ? 0 : res.length - 1;
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
index 233965aa3e..85e65849a6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
@@ -61,6 +61,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 
 public class AggregationOperatorTest {
@@ -319,9 +321,11 @@ public class AggregationOperatorTest {
             Collections.singleton("sensor0"),
             fragmentInstanceContext.getOperatorContexts().get(0),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, true, true),
             null,
             true,
-            groupByTimeParameter);
+            groupByTimeParameter,
+            DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
     List<TsFileResource> seqResources1 = new ArrayList<>();
     List<TsFileResource> unSeqResources1 = new ArrayList<>();
     seqResources1.add(seqResources.get(0));
@@ -341,9 +345,11 @@ public class AggregationOperatorTest {
             Collections.singleton("sensor0"),
             fragmentInstanceContext.getOperatorContexts().get(1),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, true, true),
             null,
             true,
-            groupByTimeParameter);
+            groupByTimeParameter,
+            DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
     List<TsFileResource> seqResources2 = new ArrayList<>();
     List<TsFileResource> unSeqResources2 = new ArrayList<>();
     seqResources2.add(seqResources.get(2));
@@ -368,9 +374,8 @@ public class AggregationOperatorTest {
     return new AggregationOperator(
         fragmentInstanceContext.getOperatorContexts().get(2),
         finalAggregators,
+        initTimeRangeIterator(groupByTimeParameter, true, true),
         children,
-        true,
-        groupByTimeParameter,
-        true);
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
index 2277299102..a2e989f47e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
@@ -63,6 +63,8 @@ import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -621,10 +623,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         1, planNodeId, SeriesScanOperator.class.getSimpleName());
     fragmentInstanceContext
         .getOperatorContexts()
-        .forEach(
-            operatorContext -> {
-              operatorContext.setMaxRunTime(TEST_TIME_SLICE);
-            });
+        .forEach(operatorContext -> operatorContext.setMaxRunTime(TEST_TIME_SLICE));
 
     AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
         new AlignedSeriesAggregationScanOperator(
@@ -632,9 +631,11 @@ public class AlignedSeriesAggregationScanOperatorTest {
             alignedPath,
             fragmentInstanceContext.getOperatorContexts().get(0),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, ascending, true),
             timeFilter,
             ascending,
-            groupByTimeParameter);
+            groupByTimeParameter,
+            DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
     seriesAggregationScanOperator.initQueryDataSource(
         new QueryDataSource(seqResources, unSeqResources));
     return seriesAggregationScanOperator;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java
index 5cbe7b189f..432967bb66 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java
@@ -129,6 +129,21 @@ public class FillOperatorTest {
                 public boolean isFinished() {
                   return index >= 3;
                 }
+
+                @Override
+                public long calculateMaxPeekMemory() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateMaxReturnSize() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateRetainedSizeAfterCallingNext() {
+                  return 0;
+                }
               });
 
       int count = 0;
@@ -283,6 +298,21 @@ public class FillOperatorTest {
                 public boolean isFinished() {
                   return index >= 3;
                 }
+
+                @Override
+                public long calculateMaxPeekMemory() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateMaxReturnSize() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateRetainedSizeAfterCallingNext() {
+                  return 0;
+                }
               });
 
       int count = 0;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
index e44f6ce2b9..38d244a13f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
@@ -126,6 +126,21 @@ public class LastQueryMergeOperatorTest {
           public boolean isFinished() {
             return !hasNext();
           }
+
+          @Override
+          public long calculateMaxPeekMemory() {
+            return 0;
+          }
+
+          @Override
+          public long calculateMaxReturnSize() {
+            return 0;
+          }
+
+          @Override
+          public long calculateRetainedSizeAfterCallingNext() {
+            return 0;
+          }
         };
 
     Operator operator2 =
@@ -175,6 +190,21 @@ public class LastQueryMergeOperatorTest {
           public boolean isFinished() {
             return !hasNext();
           }
+
+          @Override
+          public long calculateMaxPeekMemory() {
+            return 0;
+          }
+
+          @Override
+          public long calculateMaxReturnSize() {
+            return 0;
+          }
+
+          @Override
+          public long calculateRetainedSizeAfterCallingNext() {
+            return 0;
+          }
         };
 
     LastQueryMergeOperator lastQueryMergeOperator =
@@ -292,6 +322,21 @@ public class LastQueryMergeOperatorTest {
           public boolean isFinished() {
             return !hasNext();
           }
+
+          @Override
+          public long calculateMaxPeekMemory() {
+            return 0;
+          }
+
+          @Override
+          public long calculateMaxReturnSize() {
+            return 0;
+          }
+
+          @Override
+          public long calculateRetainedSizeAfterCallingNext() {
+            return 0;
+          }
         };
 
     Operator operator2 =
@@ -341,6 +386,21 @@ public class LastQueryMergeOperatorTest {
           public boolean isFinished() {
             return !hasNext();
           }
+
+          @Override
+          public long calculateMaxPeekMemory() {
+            return 0;
+          }
+
+          @Override
+          public long calculateMaxReturnSize() {
+            return 0;
+          }
+
+          @Override
+          public long calculateRetainedSizeAfterCallingNext() {
+            return 0;
+          }
         };
 
     LastQueryMergeOperator lastQueryMergeOperator =
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
index d914ec3ee7..4864fed39b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
@@ -56,6 +56,8 @@ import java.util.concurrent.ExecutorService;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -133,9 +135,11 @@ public class LastQueryOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(0),
               aggregators1,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
-              null);
+              null,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
       seriesAggregationScanOperator1.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
@@ -155,9 +159,11 @@ public class LastQueryOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(2),
               aggregators2,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
-              null);
+              null,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
       seriesAggregationScanOperator2.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
@@ -252,9 +258,11 @@ public class LastQueryOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(0),
               aggregators1,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
-              null);
+              null,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
       seriesAggregationScanOperator1.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
@@ -274,9 +282,11 @@ public class LastQueryOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(2),
               aggregators2,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
-              null);
+              null,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
       seriesAggregationScanOperator2.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
index 2785f1d803..6dc84c071b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
@@ -58,6 +58,8 @@ import java.util.concurrent.ExecutorService;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -135,9 +137,11 @@ public class LastQuerySortOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(0),
               aggregators1,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
-              null);
+              null,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
       seriesAggregationScanOperator1.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
@@ -157,9 +161,11 @@ public class LastQuerySortOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(2),
               aggregators2,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
-              null);
+              null,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
       seriesAggregationScanOperator2.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
@@ -255,9 +261,11 @@ public class LastQuerySortOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(0),
               aggregators1,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
-              null);
+              null,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
       seriesAggregationScanOperator1.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
@@ -277,9 +285,11 @@ public class LastQuerySortOperatorTest {
               allSensors,
               fragmentInstanceContext.getOperatorContexts().get(2),
               aggregators2,
+              initTimeRangeIterator(null, false, true),
               null,
               false,
-              null);
+              null,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
       seriesAggregationScanOperator2.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
index 56c2fe6a1a..733694d4d5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
@@ -162,6 +162,21 @@ public class LinearFillOperatorTest {
                 public boolean isFinished() {
                   return index >= 3;
                 }
+
+                @Override
+                public long calculateMaxPeekMemory() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateMaxReturnSize() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateRetainedSizeAfterCallingNext() {
+                  return 0;
+                }
               });
 
       int count = 0;
@@ -360,6 +375,21 @@ public class LinearFillOperatorTest {
                 public boolean isFinished() {
                   return index >= 3;
                 }
+
+                @Override
+                public long calculateMaxPeekMemory() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateMaxReturnSize() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateRetainedSizeAfterCallingNext() {
+                  return 0;
+                }
               });
 
       float[][][] res =
@@ -558,6 +588,21 @@ public class LinearFillOperatorTest {
                 public boolean isFinished() {
                   return index >= 3;
                 }
+
+                @Override
+                public long calculateMaxPeekMemory() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateMaxReturnSize() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateRetainedSizeAfterCallingNext() {
+                  return 0;
+                }
               });
 
       int count = 0;
@@ -756,6 +801,21 @@ public class LinearFillOperatorTest {
                 public boolean isFinished() {
                   return index >= 3;
                 }
+
+                @Override
+                public long calculateMaxPeekMemory() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateMaxReturnSize() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateRetainedSizeAfterCallingNext() {
+                  return 0;
+                }
               });
 
       int count = 0;
@@ -902,6 +962,21 @@ public class LinearFillOperatorTest {
                 public boolean isFinished() {
                   return index >= 7;
                 }
+
+                @Override
+                public long calculateMaxPeekMemory() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateMaxReturnSize() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateRetainedSizeAfterCallingNext() {
+                  return 0;
+                }
               });
 
       int count = 0;
@@ -1007,6 +1082,21 @@ public class LinearFillOperatorTest {
                 public boolean isFinished() {
                   return index >= 7;
                 }
+
+                @Override
+                public long calculateMaxPeekMemory() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateMaxReturnSize() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateRetainedSizeAfterCallingNext() {
+                  return 0;
+                }
               });
 
       int count = 0;
@@ -1112,6 +1202,21 @@ public class LinearFillOperatorTest {
                 public boolean isFinished() {
                   return index >= 7;
                 }
+
+                @Override
+                public long calculateMaxPeekMemory() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateMaxReturnSize() {
+                  return 0;
+                }
+
+                @Override
+                public long calculateRetainedSizeAfterCallingNext() {
+                  return 0;
+                }
               });
 
       int count = 0;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
new file mode 100644
index 0000000000..247eb0fc32
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -0,0 +1,1542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.SortOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryCollectOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQuerySortOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.CountMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesCountOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesSchemaScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.LevelTimeSeriesCountOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.NodeManageMemoryMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsConvertOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsCountOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsSchemaScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.PathsUsingTemplateScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaFetchMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaFetchScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryOrderByHeatOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesCountOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesSchemaScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticAdditionColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareLessEqualColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.type.BooleanType;
+import org.apache.iotdb.tsfile.read.common.type.LongType;
+import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator.MAP_NODE_RETRAINED_SIZE;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class OperatorMemoryTest {
+
+  @Test
+  public void seriesScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      MeasurementPath measurementPath =
+          new MeasurementPath("root.SeriesScanOperatorTest.device0.sensor0", TSDataType.INT32);
+      Set<String> allSensors = Sets.newHashSet("sensor0");
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      SeriesScanOperator seriesScanOperator =
+          new SeriesScanOperator(
+              planNodeId,
+              measurementPath,
+              allSensors,
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              true);
+
+      assertEquals(
+          TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+          seriesScanOperator.calculateMaxPeekMemory());
+      assertEquals(
+          TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+          seriesScanOperator.calculateMaxReturnSize());
+      assertEquals(0, seriesScanOperator.calculateRetainedSizeAfterCallingNext());
+
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void alignedSeriesScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      AlignedPath alignedPath =
+          new AlignedPath(
+              "root.AlignedSeriesScanOperatorTest.device0",
+              Arrays.asList("sensor0", "sensor1", "sensor2"));
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, AlignedSeriesScanOperator.class.getSimpleName());
+
+      AlignedSeriesScanOperator seriesScanOperator =
+          new AlignedSeriesScanOperator(
+              planNodeId,
+              alignedPath,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              true);
+
+      assertEquals(
+          4 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+          seriesScanOperator.calculateMaxPeekMemory());
+      assertEquals(
+          4 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+          seriesScanOperator.calculateMaxReturnSize());
+
+      assertEquals(0, seriesScanOperator.calculateRetainedSizeAfterCallingNext());
+
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void exchangeOperatorTest() {
+    ExchangeOperator exchangeOperator = new ExchangeOperator(null, null, null);
+
+    assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, exchangeOperator.calculateMaxPeekMemory());
+    assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, exchangeOperator.calculateMaxReturnSize());
+    assertEquals(0, exchangeOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void lastCacheScanOperatorTest() {
+    TsBlock tsBlock = Mockito.mock(TsBlock.class);
+    Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(1024L);
+    LastCacheScanOperator lastCacheScanOperator = new LastCacheScanOperator(null, null, tsBlock);
+
+    assertEquals(1024, lastCacheScanOperator.calculateMaxPeekMemory());
+    assertEquals(1024, lastCacheScanOperator.calculateMaxReturnSize());
+    assertEquals(0, lastCacheScanOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void fillOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+    FillOperator fillOperator =
+        new FillOperator(Mockito.mock(OperatorContext.class), new IFill[] {null, null}, child);
+
+    assertEquals(2048 * 2 + 512, fillOperator.calculateMaxPeekMemory());
+    assertEquals(1024, fillOperator.calculateMaxReturnSize());
+    assertEquals(512, fillOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void lastQueryCollectOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+    Random random = new Random();
+    long expectedMaxPeekMemory = 0;
+    long expectedMaxReturnSize = 0;
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      long currentMaxPeekMemory = random.nextInt(1024) + 1024;
+      long currentMaxReturnSize = random.nextInt(1024);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(currentMaxPeekMemory);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(currentMaxReturnSize);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+      children.add(child);
+      expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, currentMaxPeekMemory);
+      expectedMaxReturnSize = Math.max(expectedMaxReturnSize, currentMaxReturnSize);
+    }
+    LastQueryCollectOperator lastQueryCollectOperator =
+        new LastQueryCollectOperator(Mockito.mock(OperatorContext.class), children);
+
+    assertEquals(expectedMaxPeekMemory, lastQueryCollectOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, lastQueryCollectOperator.calculateMaxReturnSize());
+    assertEquals(4 * 512, lastQueryCollectOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void lastQueryMergeOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+    Random random = new Random();
+    long expectedMaxPeekMemory = 0;
+    long temp = 0;
+    long expectedMaxReturnSize = 0;
+    long childSumReturnSize = 0;
+    long minReturnSize = Long.MAX_VALUE;
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      long currentMaxPeekMemory = random.nextInt(1024) + 1024;
+      long currentMaxReturnSize = random.nextInt(1024);
+      minReturnSize = Math.min(minReturnSize, currentMaxReturnSize);
+      childSumReturnSize += currentMaxReturnSize;
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(currentMaxPeekMemory);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(currentMaxReturnSize);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+      children.add(child);
+      expectedMaxReturnSize = Math.max(expectedMaxReturnSize, currentMaxReturnSize);
+      expectedMaxPeekMemory =
+          Math.max(expectedMaxPeekMemory, temp + child.calculateMaxPeekMemory());
+      temp += (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
+    }
+    // we need to cache all the TsBlocks of children and then return a new TsBlock as result whose
+    // max possible should be equal to max return size among all its children and then we should
+    // also take TreeMap memory into account.
+    expectedMaxPeekMemory =
+        Math.max(
+            expectedMaxPeekMemory,
+            temp
+                + expectedMaxReturnSize
+                + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+                    * MAP_NODE_RETRAINED_SIZE);
+
+    LastQueryMergeOperator lastQueryMergeOperator =
+        new LastQueryMergeOperator(
+            Mockito.mock(OperatorContext.class), children, Comparator.naturalOrder());
+
+    assertEquals(expectedMaxPeekMemory, lastQueryMergeOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, lastQueryMergeOperator.calculateMaxReturnSize());
+    assertEquals(
+        childSumReturnSize
+            - minReturnSize
+            + 4 * 512
+            + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+                * MAP_NODE_RETRAINED_SIZE,
+        lastQueryMergeOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void lastQueryOperatorTest() {
+    TsBlockBuilder builder = Mockito.mock(TsBlockBuilder.class);
+    Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(1024L);
+    List<UpdateLastCacheOperator> children = new ArrayList<>(4);
+    long expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    for (int i = 0; i < 4; i++) {
+      UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+      children.add(child);
+      expectedMaxReturnSize = Math.max(expectedMaxReturnSize, 1024L);
+    }
+    LastQueryOperator lastQueryOperator =
+        new LastQueryOperator(Mockito.mock(OperatorContext.class), children, builder);
+
+    assertEquals(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + 2 * 1024 * 1024L,
+        lastQueryOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, lastQueryOperator.calculateMaxReturnSize());
+    assertEquals(4 * 512L, lastQueryOperator.calculateRetainedSizeAfterCallingNext());
+
+    Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(4 * 1024 * 1024L);
+    assertEquals(4 * 1024 * 1024L + 2 * 1024 * 1024L, lastQueryOperator.calculateMaxPeekMemory());
+    assertEquals(4 * 1024 * 1024L, lastQueryOperator.calculateMaxReturnSize());
+    assertEquals(4 * 512L, lastQueryOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void lastQuerySortOperatorTest() {
+    TsBlock tsBlock = Mockito.mock(TsBlock.class);
+    Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(16 * 1024L);
+    Mockito.when(tsBlock.getPositionCount()).thenReturn(16);
+    List<UpdateLastCacheOperator> children = new ArrayList<>(4);
+
+    for (int i = 0; i < 4; i++) {
+      UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+      children.add(child);
+    }
+
+    LastQuerySortOperator lastQuerySortOperator =
+        new LastQuerySortOperator(
+            Mockito.mock(OperatorContext.class), tsBlock, children, Comparator.naturalOrder());
+
+    assertEquals(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + tsBlock.getRetainedSizeInBytes() + 2 * 1024L,
+        lastQuerySortOperator.calculateMaxPeekMemory());
+    assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, lastQuerySortOperator.calculateMaxReturnSize());
+    assertEquals(
+        16 * 1024L + 1024L + 4 * 512L,
+        lastQuerySortOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void limitOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+    LimitOperator limitOperator =
+        new LimitOperator(Mockito.mock(OperatorContext.class), 100, child);
+
+    assertEquals(2 * 1024L, limitOperator.calculateMaxPeekMemory());
+    assertEquals(1024, limitOperator.calculateMaxReturnSize());
+    assertEquals(512, limitOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void offsetOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+    OffsetOperator offsetOperator =
+        new OffsetOperator(Mockito.mock(OperatorContext.class), 100, child);
+
+    assertEquals(2 * 1024L, offsetOperator.calculateMaxPeekMemory());
+    assertEquals(1024, offsetOperator.calculateMaxReturnSize());
+    assertEquals(512, offsetOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void rowBasedTimeJoinOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+    List<TSDataType> dataTypeList = new ArrayList<>(2);
+    dataTypeList.add(TSDataType.INT32);
+    dataTypeList.add(TSDataType.INT32);
+    long expectedMaxReturnSize =
+        3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    long expectedMaxPeekMemory = 0;
+    long childrenMaxPeekMemory = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      childrenMaxPeekMemory =
+          Math.max(childrenMaxPeekMemory, expectedMaxPeekMemory + child.calculateMaxPeekMemory());
+      expectedMaxPeekMemory += 64 * 1024L;
+      children.add(child);
+    }
+
+    expectedMaxPeekMemory =
+        Math.max(expectedMaxPeekMemory + expectedMaxReturnSize, childrenMaxPeekMemory);
+
+    RowBasedTimeJoinOperator rowBasedTimeJoinOperator =
+        new RowBasedTimeJoinOperator(
+            Mockito.mock(OperatorContext.class), children, Ordering.ASC, dataTypeList, null, null);
+
+    assertEquals(expectedMaxPeekMemory, rowBasedTimeJoinOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, rowBasedTimeJoinOperator.calculateMaxReturnSize());
+    assertEquals(3 * 64 * 1024L, rowBasedTimeJoinOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void sortOperatorTest() {
+    SortOperator sortOperator = new SortOperator();
+    assertEquals(0, sortOperator.calculateMaxPeekMemory());
+    assertEquals(0, sortOperator.calculateMaxReturnSize());
+    assertEquals(0, sortOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void timeJoinOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+    List<TSDataType> dataTypeList = new ArrayList<>(2);
+    dataTypeList.add(TSDataType.INT32);
+    dataTypeList.add(TSDataType.INT32);
+    long expectedMaxReturnSize =
+        3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    long expectedMaxPeekMemory = 0;
+    long childrenMaxPeekMemory = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      childrenMaxPeekMemory =
+          Math.max(childrenMaxPeekMemory, expectedMaxPeekMemory + child.calculateMaxPeekMemory());
+      expectedMaxPeekMemory += 64 * 1024L;
+      children.add(child);
+    }
+
+    expectedMaxPeekMemory =
+        Math.max(expectedMaxPeekMemory + expectedMaxReturnSize, childrenMaxPeekMemory);
+
+    TimeJoinOperator timeJoinOperator =
+        new TimeJoinOperator(
+            Mockito.mock(OperatorContext.class), children, Ordering.ASC, dataTypeList, null, null);
+
+    assertEquals(expectedMaxPeekMemory, timeJoinOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, timeJoinOperator.calculateMaxReturnSize());
+    assertEquals(3 * 64 * 1024L, timeJoinOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void updateLastCacheOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+    UpdateLastCacheOperator updateLastCacheOperator =
+        new UpdateLastCacheOperator(null, child, null, TSDataType.BOOLEAN, null, true);
+
+    assertEquals(2048, updateLastCacheOperator.calculateMaxPeekMemory());
+    assertEquals(1024, updateLastCacheOperator.calculateMaxReturnSize());
+    assertEquals(512, updateLastCacheOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void linearFillOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+    LinearFillOperator linearFillOperator =
+        new LinearFillOperator(
+            Mockito.mock(OperatorContext.class), new LinearFill[] {null, null}, child);
+
+    assertEquals(2048 * 3 + 512L, linearFillOperator.calculateMaxPeekMemory());
+    assertEquals(1024, linearFillOperator.calculateMaxReturnSize());
+    assertEquals(512, linearFillOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void deviceMergeOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+    List<TSDataType> dataTypeList = new ArrayList<>(2);
+    dataTypeList.add(TSDataType.INT32);
+    dataTypeList.add(TSDataType.INT32);
+    List<String> devices = new ArrayList<>(4);
+    devices.add("device1");
+    devices.add("device2");
+    devices.add("device3");
+    devices.add("device4");
+    long expectedMaxReturnSize =
+        3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    long expectedMaxPeekMemory = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    long expectedRetainedSizeAfterCallingNext = 0;
+    long childrenMaxPeekMemory = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(64 * 1024L);
+      expectedMaxPeekMemory += 128 * 1024L;
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+      expectedRetainedSizeAfterCallingNext += 128 * 1024L;
+      children.add(child);
+    }
+
+    expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+
+    DeviceMergeOperator deviceMergeOperator =
+        new DeviceMergeOperator(
+            Mockito.mock(OperatorContext.class),
+            devices,
+            children,
+            dataTypeList,
+            Mockito.mock(TimeSelector.class),
+            Mockito.mock(TimeComparator.class));
+
+    assertEquals(expectedMaxPeekMemory, deviceMergeOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, deviceMergeOperator.calculateMaxReturnSize());
+    assertEquals(
+        expectedRetainedSizeAfterCallingNext - 64 * 1024L,
+        deviceMergeOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void deviceViewOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+    List<TSDataType> dataTypeList = new ArrayList<>(2);
+    dataTypeList.add(TSDataType.INT32);
+    dataTypeList.add(TSDataType.INT32);
+    List<String> devices = new ArrayList<>(4);
+    devices.add("device1");
+    devices.add("device2");
+    devices.add("device3");
+    devices.add("device4");
+    long expectedMaxReturnSize =
+        2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    long expectedMaxPeekMemory = expectedMaxReturnSize;
+    long expectedRetainedSizeAfterCallingNext = 0;
+    long childrenMaxPeekMemory = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(1024L);
+      expectedMaxPeekMemory += 1024L;
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+      expectedRetainedSizeAfterCallingNext += 1024L;
+      children.add(child);
+    }
+
+    expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+
+    DeviceViewOperator deviceViewOperator =
+        new DeviceViewOperator(
+            Mockito.mock(OperatorContext.class),
+            devices,
+            children,
+            new ArrayList<>(),
+            dataTypeList);
+
+    assertEquals(expectedMaxPeekMemory, deviceViewOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, deviceViewOperator.calculateMaxReturnSize());
+    assertEquals(
+        expectedRetainedSizeAfterCallingNext,
+        deviceViewOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void filterAndProjectOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+    BooleanType booleanType = Mockito.mock(BooleanType.class);
+    Mockito.when(booleanType.getTypeEnum()).thenReturn(TypeEnum.BOOLEAN);
+    LongType longType = Mockito.mock(LongType.class);
+    Mockito.when(longType.getTypeEnum()).thenReturn(TypeEnum.INT64);
+    ColumnTransformer filterColumnTransformer =
+        new CompareLessEqualColumnTransformer(
+            booleanType,
+            new TimeColumnTransformer(longType),
+            new ConstantColumnTransformer(longType, Mockito.mock(IntColumn.class)));
+    List<TSDataType> filterOutputTypes = new ArrayList<>();
+    filterOutputTypes.add(TSDataType.INT32);
+    filterOutputTypes.add(TSDataType.INT64);
+    List<ColumnTransformer> projectColumnTransformers = new ArrayList<>();
+    projectColumnTransformers.add(
+        new ArithmeticAdditionColumnTransformer(
+            booleanType,
+            new TimeColumnTransformer(longType),
+            new ConstantColumnTransformer(longType, Mockito.mock(IntColumn.class))));
+
+    FilterAndProjectOperator operator =
+        new FilterAndProjectOperator(
+            Mockito.mock(OperatorContext.class),
+            child,
+            filterOutputTypes,
+            new ArrayList<>(),
+            filterColumnTransformer,
+            new ArrayList<>(),
+            new ArrayList<>(),
+            projectColumnTransformers,
+            false,
+            true);
+
+    assertEquals(
+        4L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() + 512L,
+        operator.calculateMaxPeekMemory());
+    assertEquals(
+        2 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+        operator.calculateMaxReturnSize());
+    assertEquals(512, operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void TimeSeriesSchemaScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      TimeSeriesSchemaScanOperator operator =
+          new TimeSeriesSchemaScanOperator(
+              planNodeId,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              0,
+              0,
+              null,
+              null,
+              null,
+              false,
+              false,
+              false,
+              null);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void DeviceSchemaScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      DevicesSchemaScanOperator operator =
+          new DevicesSchemaScanOperator(
+              planNodeId,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              0,
+              0,
+              null,
+              false,
+              false);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void PathsUsingTemplateScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      PathsUsingTemplateScanOperator operator =
+          new PathsUsingTemplateScanOperator(
+              planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), 0);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void TimeSeriesCountOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      TimeSeriesCountOperator operator =
+          new TimeSeriesCountOperator(
+              planNodeId,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              false,
+              null,
+              null,
+              false);
+
+      assertEquals(4L, operator.calculateMaxPeekMemory());
+      assertEquals(4L, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void LevelTimeSeriesCountOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      LevelTimeSeriesCountOperator operator =
+          new LevelTimeSeriesCountOperator(
+              planNodeId,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              false,
+              4,
+              null,
+              null,
+              false);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void DevicesCountOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      DevicesCountOperator operator =
+          new DevicesCountOperator(
+              planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), null, false);
+
+      assertEquals(4L, operator.calculateMaxPeekMemory());
+      assertEquals(4L, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void SchemaQueryMergeOperatorTest() {
+    QueryId queryId = new QueryId("stub_query");
+    List<Operator> children = new ArrayList<>(4);
+
+    long expectedMaxReturnSize = 0;
+    long expectedMaxPeekMemory = 0;
+    long expectedRetainedSize = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, child.calculateMaxPeekMemory());
+      expectedMaxReturnSize = Math.max(expectedMaxReturnSize, child.calculateMaxReturnSize());
+      expectedRetainedSize += child.calculateRetainedSizeAfterCallingNext();
+      children.add(child);
+    }
+
+    SchemaQueryMergeOperator operator =
+        new SchemaQueryMergeOperator(
+            queryId.genPlanNodeId(), Mockito.mock(OperatorContext.class), children);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void CountMergeOperatorTest() {
+    QueryId queryId = new QueryId("stub_query");
+    List<Operator> children = new ArrayList<>(4);
+
+    long expectedMaxReturnSize = 0;
+    long expectedMaxPeekMemory = 0;
+    long expectedRetainedSize = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, child.calculateMaxPeekMemory());
+      expectedMaxReturnSize = Math.max(expectedMaxReturnSize, child.calculateMaxReturnSize());
+      expectedRetainedSize += child.calculateRetainedSizeAfterCallingNext();
+      children.add(child);
+    }
+
+    CountMergeOperator operator =
+        new CountMergeOperator(
+            queryId.genPlanNodeId(), Mockito.mock(OperatorContext.class), children);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void SchemaFetchScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      SchemaFetchScanOperator operator =
+          new SchemaFetchScanOperator(
+              planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), null, null, null);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void SchemaFetchMergeOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+
+    long expectedMaxReturnSize = 0;
+    long expectedMaxPeekMemory = 0;
+    long expectedRetainedSize = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, child.calculateMaxPeekMemory());
+      expectedMaxReturnSize = Math.max(expectedMaxReturnSize, child.calculateMaxReturnSize());
+      expectedRetainedSize += child.calculateRetainedSizeAfterCallingNext();
+      children.add(child);
+    }
+
+    SchemaFetchMergeOperator operator =
+        new SchemaFetchMergeOperator(Mockito.mock(OperatorContext.class), children, null);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void SchemaQueryOrderByHeatOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+
+    long expectedMaxReturnSize = 0;
+    long expectedMaxPeekMemory = 0;
+    long expectedRetainedSize = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      expectedMaxPeekMemory += child.calculateMaxReturnSize();
+      expectedMaxReturnSize += child.calculateMaxReturnSize();
+      expectedRetainedSize +=
+          child.calculateRetainedSizeAfterCallingNext() + child.calculateMaxReturnSize();
+      children.add(child);
+    }
+
+    SchemaQueryOrderByHeatOperator operator =
+        new SchemaQueryOrderByHeatOperator(Mockito.mock(OperatorContext.class), children);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void NodePathsSchemaScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      NodePathsSchemaScanOperator operator =
+          new NodePathsSchemaScanOperator(
+              planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), null, 4);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void NodePathsConvertOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+
+    long expectedMaxPeekMemory = child.calculateMaxPeekMemory() + child.calculateMaxReturnSize();
+    long expectedMaxReturnSize = child.calculateMaxReturnSize();
+    long expectedRetainedSize = child.calculateRetainedSizeAfterCallingNext();
+
+    NodePathsConvertOperator operator =
+        new NodePathsConvertOperator(Mockito.mock(OperatorContext.class), child);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void NodePathsCountOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+
+    long expectedMaxPeekMemory =
+        Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxPeekMemory());
+    long expectedMaxReturnSize =
+        Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize());
+    long expectedRetainedSize =
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + child.calculateRetainedSizeAfterCallingNext();
+
+    NodePathsCountOperator operator =
+        new NodePathsCountOperator(Mockito.mock(OperatorContext.class), child);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void NodeManageMemoryMergeOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+
+    long expectedMaxPeekMemory =
+        Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxPeekMemory());
+    long expectedMaxReturnSize =
+        Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize());
+    long expectedRetainedSize =
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + child.calculateRetainedSizeAfterCallingNext();
+
+    NodeManageMemoryMergeOperator operator =
+        new NodeManageMemoryMergeOperator(
+            Mockito.mock(OperatorContext.class), Collections.emptySet(), child);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void seriesAggregationScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1", TSDataType.TEXT);
+      TypeProvider typeProvider = new TypeProvider();
+      typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+      typeProvider.setType("min_time(root.sg.d1.s1)", TSDataType.INT64);
+      typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+      // case1: without group by, step is SINGLE
+      List<AggregationDescriptor> aggregationDescriptors1 =
+          Arrays.asList(
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.COUNT.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new TimeSeriesOperand(measurementPath))));
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator1 =
+          createSeriesAggregationScanOperator(
+              instanceNotificationExecutor,
+              measurementPath,
+              aggregationDescriptors1,
+              null,
+              typeProvider);
+
+      long expectedMaxReturnSize =
+          TimeColumn.SIZE_IN_BYTES_PER_POSITION
+              + 512 * Byte.BYTES
+              + LongColumn.SIZE_IN_BYTES_PER_POSITION;
+      long expectedMaxRetainSize =
+          2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+      assertEquals(
+          expectedMaxReturnSize + expectedMaxRetainSize,
+          seriesAggregationScanOperator1.calculateMaxPeekMemory());
+      assertEquals(expectedMaxReturnSize, seriesAggregationScanOperator1.calculateMaxReturnSize());
+      assertEquals(
+          expectedMaxRetainSize,
+          seriesAggregationScanOperator1.calculateRetainedSizeAfterCallingNext());
+
+      // case2: without group by, step is PARTIAL
+      List<AggregationDescriptor> aggregationDescriptors2 =
+          Arrays.asList(
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.PARTIAL,
+                  Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.COUNT.name().toLowerCase(),
+                  AggregationStep.PARTIAL,
+                  Collections.singletonList(new TimeSeriesOperand(measurementPath))));
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator2 =
+          createSeriesAggregationScanOperator(
+              instanceNotificationExecutor,
+              measurementPath,
+              aggregationDescriptors2,
+              null,
+              typeProvider);
+
+      expectedMaxReturnSize =
+          TimeColumn.SIZE_IN_BYTES_PER_POSITION
+              + 512 * Byte.BYTES
+              + 2 * LongColumn.SIZE_IN_BYTES_PER_POSITION;
+      expectedMaxRetainSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+      assertEquals(
+          expectedMaxReturnSize + expectedMaxRetainSize,
+          seriesAggregationScanOperator2.calculateMaxPeekMemory());
+      assertEquals(expectedMaxReturnSize, seriesAggregationScanOperator2.calculateMaxReturnSize());
+      assertEquals(
+          expectedMaxRetainSize,
+          seriesAggregationScanOperator2.calculateRetainedSizeAfterCallingNext());
+
+      long maxTsBlockLineNumber =
+          TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+
+      // case3: with group by, total window num < 1000
+      GroupByTimeParameter groupByTimeParameter =
+          new GroupByTimeParameter(
+              0,
+              2 * maxTsBlockLineNumber,
+              maxTsBlockLineNumber / 100,
+              maxTsBlockLineNumber / 100,
+              true);
+      List<AggregationDescriptor> aggregationDescriptors3 =
+          Arrays.asList(
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.COUNT.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new TimeSeriesOperand(measurementPath))));
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator3 =
+          createSeriesAggregationScanOperator(
+              instanceNotificationExecutor,
+              measurementPath,
+              aggregationDescriptors3,
+              groupByTimeParameter,
+              typeProvider);
+
+      expectedMaxReturnSize =
+          200
+              * (TimeColumn.SIZE_IN_BYTES_PER_POSITION
+                  + 512 * Byte.BYTES
+                  + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+      expectedMaxRetainSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+      assertEquals(
+          expectedMaxReturnSize + expectedMaxRetainSize,
+          seriesAggregationScanOperator3.calculateMaxPeekMemory());
+      assertEquals(expectedMaxReturnSize, seriesAggregationScanOperator3.calculateMaxReturnSize());
+      assertEquals(
+          expectedMaxRetainSize,
+          seriesAggregationScanOperator3.calculateRetainedSizeAfterCallingNext());
+
+      // case4: with group by, total window num > 1000
+      groupByTimeParameter = new GroupByTimeParameter(0, 2 * maxTsBlockLineNumber, 1, 1, true);
+      List<AggregationDescriptor> aggregationDescriptors4 =
+          Arrays.asList(
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.COUNT.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new TimeSeriesOperand(measurementPath))));
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator4 =
+          createSeriesAggregationScanOperator(
+              instanceNotificationExecutor,
+              measurementPath,
+              aggregationDescriptors4,
+              groupByTimeParameter,
+              typeProvider);
+
+      expectedMaxReturnSize =
+          maxTsBlockLineNumber
+              * (TimeColumn.SIZE_IN_BYTES_PER_POSITION
+                  + 512 * Byte.BYTES
+                  + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+      expectedMaxRetainSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+      assertEquals(
+          expectedMaxReturnSize + expectedMaxRetainSize,
+          seriesAggregationScanOperator4.calculateMaxPeekMemory());
+      assertEquals(expectedMaxReturnSize, seriesAggregationScanOperator4.calculateMaxReturnSize());
+      assertEquals(
+          expectedMaxRetainSize,
+          seriesAggregationScanOperator4.calculateRetainedSizeAfterCallingNext());
+
+      // case5: over DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES
+      groupByTimeParameter = new GroupByTimeParameter(0, 2 * maxTsBlockLineNumber, 1, 1, true);
+      List<AggregationDescriptor> aggregationDescriptors5 =
+          Arrays.asList(
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+              new AggregationDescriptor(
+                  AggregationType.FIRST_VALUE.name().toLowerCase(),
+                  AggregationStep.SINGLE,
+                  Collections.singletonList(new TimeSeriesOperand(measurementPath))));
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator5 =
+          createSeriesAggregationScanOperator(
+              instanceNotificationExecutor,
+              measurementPath,
+              aggregationDescriptors5,
+              groupByTimeParameter,
+              typeProvider);
+
+      expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+      expectedMaxRetainSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+      assertEquals(
+          expectedMaxReturnSize + expectedMaxRetainSize,
+          seriesAggregationScanOperator5.calculateMaxPeekMemory());
+      assertEquals(expectedMaxReturnSize, seriesAggregationScanOperator5.calculateMaxReturnSize());
+      assertEquals(
+          expectedMaxRetainSize,
+          seriesAggregationScanOperator5.calculateRetainedSizeAfterCallingNext());
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  private SeriesAggregationScanOperator createSeriesAggregationScanOperator(
+      ExecutorService instanceNotificationExecutor,
+      MeasurementPath measurementPath,
+      List<AggregationDescriptor> aggregationDescriptors,
+      GroupByTimeParameter groupByTimeParameter,
+      TypeProvider typeProvider)
+      throws IllegalPathException {
+    Set<String> allSensors = Sets.newHashSet("s1");
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    PlanNodeId planNodeId = new PlanNodeId("1");
+    fragmentInstanceContext.addOperatorContext(
+        1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+    List<Aggregator> aggregators = new ArrayList<>();
+    aggregationDescriptors.forEach(
+        o ->
+            aggregators.add(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(
+                        o.getAggregationType(), measurementPath.getSeriesType(), true),
+                    o.getStep())));
+
+    ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, true, true);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, typeProvider);
+
+    return new SeriesAggregationScanOperator(
+        planNodeId,
+        measurementPath,
+        allSensors,
+        fragmentInstanceContext.getOperatorContexts().get(0),
+        aggregators,
+        timeRangeIterator,
+        null,
+        true,
+        groupByTimeParameter,
+        maxReturnSize);
+  }
+
+  @Test
+  public void rawDataAggregationOperatorTest() throws IllegalPathException {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+    MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1", TSDataType.TEXT);
+    TypeProvider typeProvider = new TypeProvider();
+    typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+    typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+    List<AggregationDescriptor> aggregationDescriptors =
+        Arrays.asList(
+            new AggregationDescriptor(
+                AggregationType.FIRST_VALUE.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+            new AggregationDescriptor(
+                AggregationType.COUNT.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new TimeSeriesOperand(measurementPath))));
+
+    List<Aggregator> aggregators = new ArrayList<>();
+    aggregationDescriptors.forEach(
+        o ->
+            aggregators.add(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(
+                        o.getAggregationType(), measurementPath.getSeriesType(), true),
+                    o.getStep())));
+
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 1000, 10, 10, true);
+    ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, true, false);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, typeProvider);
+
+    RawDataAggregationOperator rawDataAggregationOperator =
+        new RawDataAggregationOperator(
+            Mockito.mock(OperatorContext.class),
+            aggregators,
+            timeRangeIterator,
+            child,
+            true,
+            maxReturnSize);
+
+    long expectedMaxReturnSize =
+        100
+            * (512 * Byte.BYTES
+                + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+                + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+    long expectedMaxRetainSize = child.calculateMaxReturnSize();
+
+    assertEquals(
+        expectedMaxReturnSize
+            + expectedMaxRetainSize
+            + child.calculateRetainedSizeAfterCallingNext(),
+        rawDataAggregationOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, rawDataAggregationOperator.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxRetainSize + child.calculateRetainedSizeAfterCallingNext(),
+        rawDataAggregationOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void slidingWindowAggregationOperatorTest() throws IllegalPathException {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+    MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1", TSDataType.TEXT);
+    TypeProvider typeProvider = new TypeProvider();
+    typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+    typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+    List<AggregationDescriptor> aggregationDescriptors =
+        Arrays.asList(
+            new AggregationDescriptor(
+                AggregationType.FIRST_VALUE.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+            new AggregationDescriptor(
+                AggregationType.COUNT.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new TimeSeriesOperand(measurementPath))));
+
+    List<Aggregator> aggregators = new ArrayList<>();
+    aggregationDescriptors.forEach(
+        o ->
+            aggregators.add(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(
+                        o.getAggregationType(), measurementPath.getSeriesType(), true),
+                    o.getStep())));
+
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 1000, 10, 5, true);
+    ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, true, false);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, typeProvider);
+
+    SlidingWindowAggregationOperator slidingWindowAggregationOperator =
+        new SlidingWindowAggregationOperator(
+            Mockito.mock(OperatorContext.class),
+            aggregators,
+            timeRangeIterator,
+            child,
+            true,
+            groupByTimeParameter,
+            maxReturnSize);
+
+    long expectedMaxReturnSize =
+        200
+            * (512 * Byte.BYTES
+                + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+                + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+    long expectedMaxRetainSize = child.calculateMaxReturnSize();
+
+    assertEquals(
+        expectedMaxReturnSize
+            + expectedMaxRetainSize
+            + child.calculateRetainedSizeAfterCallingNext(),
+        slidingWindowAggregationOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, slidingWindowAggregationOperator.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxRetainSize + child.calculateRetainedSizeAfterCallingNext(),
+        slidingWindowAggregationOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void aggregationOperatorTest() throws IllegalPathException {
+    List<Operator> children = new ArrayList<>(4);
+    long expectedChildrenRetainedSize = 0L;
+    long expectedMaxRetainSize = 0L;
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(64 * 1024L);
+      expectedChildrenRetainedSize += 64 * 1024L;
+      expectedMaxRetainSize += 64 * 1024L;
+      children.add(child);
+    }
+
+    MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1", TSDataType.TEXT);
+    TypeProvider typeProvider = new TypeProvider();
+    typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+    typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+    List<AggregationDescriptor> aggregationDescriptors =
+        Arrays.asList(
+            new AggregationDescriptor(
+                AggregationType.FIRST_VALUE.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+            new AggregationDescriptor(
+                AggregationType.COUNT.name().toLowerCase(),
+                AggregationStep.FINAL,
+                Collections.singletonList(new TimeSeriesOperand(measurementPath))));
+
+    List<Aggregator> aggregators = new ArrayList<>();
+    aggregationDescriptors.forEach(
+        o ->
+            aggregators.add(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(
+                        o.getAggregationType(), measurementPath.getSeriesType(), true),
+                    o.getStep())));
+
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 1000, 10, 10, true);
+    ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, true, false);
+    long maxReturnSize =
+        AggregationUtil.calculateMaxAggregationResultSize(
+            aggregationDescriptors, timeRangeIterator, typeProvider);
+
+    AggregationOperator aggregationOperator =
+        new AggregationOperator(
+            Mockito.mock(OperatorContext.class),
+            aggregators,
+            timeRangeIterator,
+            children,
+            maxReturnSize);
+
+    long expectedMaxReturnSize =
+        100
+            * (512 * Byte.BYTES
+                + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+                + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+
+    assertEquals(
+        expectedMaxReturnSize + expectedMaxRetainSize + expectedChildrenRetainedSize,
+        aggregationOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, aggregationOperator.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxRetainSize + expectedChildrenRetainedSize,
+        aggregationOperator.calculateRetainedSizeAfterCallingNext());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index 6a9902f24b..f25b0d2fcd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -64,6 +64,8 @@ import java.util.concurrent.ExecutorService;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 
 public class RawDataAggregationOperatorTest {
@@ -381,8 +383,9 @@ public class RawDataAggregationOperatorTest {
     return new RawDataAggregationOperator(
         fragmentInstanceContext.getOperatorContexts().get(3),
         aggregators,
+        initTimeRangeIterator(groupByTimeParameter, true, true),
         timeJoinOperator,
         true,
-        groupByTimeParameter);
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
index 71fd167404..f3b37b83f7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -59,6 +59,8 @@ import java.util.concurrent.ExecutorService;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 
 public class SeriesAggregationScanOperatorTest {
@@ -515,9 +517,11 @@ public class SeriesAggregationScanOperatorTest {
             allSensors,
             fragmentInstanceContext.getOperatorContexts().get(0),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, ascending, true),
             timeFilter,
             ascending,
-            groupByTimeParameter);
+            groupByTimeParameter,
+            DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
     seriesAggregationScanOperator.initQueryDataSource(
         new QueryDataSource(seqResources, unSeqResources));
     return seriesAggregationScanOperator;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
index 81ab99e1ff..a0ed242e63 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
@@ -61,6 +61,8 @@ import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class SlidingWindowAggregationOperatorTest {
 
@@ -232,9 +234,11 @@ public class SlidingWindowAggregationOperatorTest {
             Collections.singleton("sensor0"),
             fragmentInstanceContext.getOperatorContexts().get(0),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, ascending, true),
             null,
             ascending,
-            groupByTimeParameter);
+            groupByTimeParameter,
+            DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
     seriesAggregationScanOperator.initQueryDataSource(
         new QueryDataSource(seqResources, unSeqResources));
 
@@ -254,8 +258,10 @@ public class SlidingWindowAggregationOperatorTest {
     return new SlidingWindowAggregationOperator(
         fragmentInstanceContext.getOperatorContexts().get(1),
         finalAggregators,
+        initTimeRangeIterator(groupByTimeParameter, ascending, false),
         seriesAggregationScanOperator,
         ascending,
-        groupByTimeParameter);
+        groupByTimeParameter,
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
index 34b9b8fa87..bff57278b1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
@@ -56,6 +56,8 @@ import java.util.concurrent.ExecutorService;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -211,9 +213,11 @@ public class UpdateLastCacheOperatorTest {
             allSensors,
             fragmentInstanceContext.getOperatorContexts().get(0),
             aggregators,
+            initTimeRangeIterator(groupByTimeParameter, ascending, true),
             timeFilter,
             ascending,
-            groupByTimeParameter);
+            groupByTimeParameter,
+            DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
     seriesAggregationScanOperator.initQueryDataSource(
         new QueryDataSource(seqResources, unSeqResources));
 
diff --git a/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java b/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java
index 15da188fa1..4eb277d663 100644
--- a/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java
@@ -74,15 +74,14 @@ public class ResourceManagerTest {
   List<TsFileResource> unseqResources = new ArrayList<>();
 
   private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
-  private TsFileResourceManager tsFileResourceManager = TsFileResourceManager.getInstance();
-  private double prevTimeIndexMemoryProportion;
-  private double prevTimeIndexMemoryThreshold;
+  private final TsFileResourceManager tsFileResourceManager = TsFileResourceManager.getInstance();
+  private long prevTimeIndexMemoryThreshold;
   private TimeIndexLevel timeIndexLevel;
 
   @Before
   public void setUp() throws IOException, WriteProcessException, MetadataException {
     IoTDB.configManager.init();
-    prevTimeIndexMemoryProportion = CONFIG.getTimeIndexMemoryProportion();
+    prevTimeIndexMemoryThreshold = CONFIG.getAllocateMemoryForTimeIndex();
     timeIndexLevel = CONFIG.getTimeIndexLevel();
     prepareSeries();
   }
@@ -92,10 +91,7 @@ public class ResourceManagerTest {
     removeFiles();
     seqResources.clear();
     unseqResources.clear();
-    CONFIG.setTimeIndexMemoryProportion(prevTimeIndexMemoryProportion);
     CONFIG.setTimeIndexLevel(String.valueOf(timeIndexLevel));
-    prevTimeIndexMemoryThreshold =
-        prevTimeIndexMemoryProportion * CONFIG.getAllocateMemoryForRead();
     tsFileResourceManager.setTimeIndexMemoryThreshold(prevTimeIndexMemoryThreshold);
     ChunkCache.getInstance().clear();
     TimeSeriesMetadataCache.getInstance().clear();
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 7827dae848..2eb1ef7a52 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -95,6 +95,8 @@ public enum TSStatusCode {
   UNSUPPORTED_INDEX_FUNC_ERROR(421),
   UNSUPPORTED_INDEX_TYPE_ERROR(422),
 
+  MEMORY_NOT_ENOUGH(423),
+
   INTERNAL_SERVER_ERROR(500),
   CLOSE_OPERATION_ERROR(501),
   READ_ONLY_SYSTEM_ERROR(502),