You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/09 11:20:00 UTC

[iotdb] branch MemoryControl updated: Add UT

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/MemoryControl by this push:
     new ff861dd17d Add UT
ff861dd17d is described below

commit ff861dd17d70adae894f80fde3a30a85fae0cd63
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Aug 9 19:19:51 2022 +0800

    Add UT
---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   6 +-
 .../db/mpp/exception/MemoryNotEnoughException.java |  28 ++
 .../fragment/FragmentInstanceContext.java          |   4 +
 .../process/last/LastQueryMergeOperator.java       |   6 +-
 .../process/last/LastQuerySortOperator.java        |   7 +-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  47 ++-
 .../mpp/execution/operator/OperatorMemoryTest.java | 424 +++++++++++++++++++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   2 +
 8 files changed, 515 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 72f48793dd..e6c7825969 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1501,8 +1501,12 @@ public class IoTDBDescriptor {
               maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum);
           conf.setAllocateMemoryForTimeSeriesMetaDataCache(
               maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / proportionSum);
-          conf.setAllocateMemoryForReadWithoutCache(
+          conf.setAllocateMemoryForCoordinator(
               maxMemoryAvailable * Integer.parseInt(proportions[3].trim()) / proportionSum);
+          conf.setAllocateMemoryForOperators(
+              maxMemoryAvailable * Integer.parseInt(proportions[4].trim()) / proportionSum);
+          conf.setAllocateMemoryForDataExchange(
+              maxMemoryAvailable * Integer.parseInt(proportions[5].trim()) / proportionSum);
         } catch (Exception e) {
           throw new RuntimeException(
               "Each subsection of configuration item chunkmeta_chunk_timeseriesmeta_free_memory_proportion"
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/exception/MemoryNotEnoughException.java b/server/src/main/java/org/apache/iotdb/db/mpp/exception/MemoryNotEnoughException.java
new file mode 100644
index 0000000000..7c5f4803e3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/exception/MemoryNotEnoughException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.exception;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+
+public class MemoryNotEnoughException extends IoTDBException {
+
+  public MemoryNotEnoughException(String message, int errorCode) {
+    super(message, errorCode);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index aff2d66a41..4d0f9d9ffd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -179,4 +179,8 @@ public class FragmentInstanceContext extends QueryContext {
   public long getEndTime() {
     return executionEndTime.get();
   }
+
+  public FragmentInstanceStateMachine getStateMachine() {
+    return stateMachine;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index 9d8be1e23d..af6e7b4bf7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -42,7 +42,7 @@ import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryU
 // time-series
 public class LastQueryMergeOperator implements ProcessOperator {
 
-  private static final int MAP_NODE_RETRAINED_SIZE = 16;
+  public static final long MAP_NODE_RETRAINED_SIZE = 16L + Location.INSTANCE_SIZE;
 
   private final OperatorContext operatorContext;
 
@@ -229,7 +229,7 @@ public class LastQueryMergeOperator implements ProcessOperator {
     long maxPeekMemory =
         calculateMaxReturnSize()
             + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
-                * (Location.INSTANCE_SIZE + MAP_NODE_RETRAINED_SIZE);
+                * MAP_NODE_RETRAINED_SIZE;
     long childrenMaxPeekMemory = 0;
     for (Operator child : children) {
       maxPeekMemory += child.calculateMaxReturnSize();
@@ -270,7 +270,7 @@ public class LastQueryMergeOperator implements ProcessOperator {
 
   private static class Location {
 
-    private static final long INSTANCE_SIZE = ClassLayout.parseClass(Location.class).instanceSize();
+    private static final int INSTANCE_SIZE = ClassLayout.parseClass(Location.class).instanceSize();
     int tsBlockIndex;
     int rowIndex;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index 1b56d844c1..e40da67999 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -193,13 +193,14 @@ public class LastQuerySortOperator implements ProcessOperator {
 
   @Override
   public long calculateMaxPeekMemory() {
-    long maxPeekMemory =
-        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + tsBlockBuilder.getRetainedSizeInBytes();
+    long maxPeekMemory = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + cachedTsBlock.getRetainedSizeInBytes();
     long maxChildrenReturnSize = 0;
+    long maxChildrenPeekMemory = 0;
     for (Operator child : children) {
       maxChildrenReturnSize = Math.max(maxChildrenReturnSize, child.calculateMaxReturnSize());
+      maxChildrenPeekMemory = Math.max(maxChildrenPeekMemory, child.calculateMaxPeekMemory());
     }
-    return maxPeekMemory;
+    return Math.max(maxPeekMemory + maxChildrenReturnSize, maxChildrenPeekMemory);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 8188612e61..451cc208d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -18,17 +18,22 @@
  */
 package org.apache.iotdb.db.mpp.plan.planner;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.mpp.exception.MemoryNotEnoughException;
 import org.apache.iotdb.db.mpp.execution.driver.DataDriver;
 import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.timer.ITimeSliceAllocator;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 /**
@@ -38,6 +43,10 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
  */
 public class LocalExecutionPlanner {
 
+  /** allocated memory for operator execution */
+  private long freeMemoryForOperators =
+      IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators();
+
   public static LocalExecutionPlanner getInstance() {
     return InstanceHolder.INSTANCE;
   }
@@ -47,11 +56,15 @@ public class LocalExecutionPlanner {
       TypeProvider types,
       FragmentInstanceContext instanceContext,
       Filter timeFilter,
-      DataRegion dataRegion) {
+      DataRegion dataRegion)
+      throws MemoryNotEnoughException {
     LocalExecutionPlanContext context = new LocalExecutionPlanContext(types, instanceContext);
 
     Operator root = plan.accept(new OperatorTreeGenerator(), context);
 
+    // check whether current free memory is enough to execute current query
+    checkMemory(root, instanceContext.getStateMachine());
+
     ITimeSliceAllocator timeSliceAllocator = context.getTimeSliceAllocator();
     instanceContext
         .getOperatorContexts()
@@ -71,7 +84,8 @@ public class LocalExecutionPlanner {
   }
 
   public SchemaDriver plan(
-      PlanNode plan, FragmentInstanceContext instanceContext, ISchemaRegion schemaRegion) {
+      PlanNode plan, FragmentInstanceContext instanceContext, ISchemaRegion schemaRegion)
+      throws MemoryNotEnoughException {
 
     SchemaDriverContext schemaDriverContext =
         new SchemaDriverContext(instanceContext, schemaRegion);
@@ -81,6 +95,9 @@ public class LocalExecutionPlanner {
 
     Operator root = plan.accept(new OperatorTreeGenerator(), context);
 
+    // check whether current free memory is enough to execute current query
+    checkMemory(root, instanceContext.getStateMachine());
+
     ITimeSliceAllocator timeSliceAllocator = context.getTimeSliceAllocator();
     instanceContext
         .getOperatorContexts()
@@ -91,6 +108,32 @@ public class LocalExecutionPlanner {
     return new SchemaDriver(root, context.getSinkHandle(), schemaDriverContext);
   }
 
+  private void checkMemory(Operator root, FragmentInstanceStateMachine stateMachine)
+      throws MemoryNotEnoughException {
+    long estimatedMemorySize = root.calculateMaxPeekMemory();
+
+    synchronized (this) {
+      if (estimatedMemorySize > freeMemoryForOperators) {
+        throw new MemoryNotEnoughException(
+            String.format(
+                "There is not enough memory to execute current fragment instance, current remaining free memory is %d, estimated memory usage for current fragment instance is %d",
+                freeMemoryForOperators, estimatedMemorySize),
+            TSStatusCode.MEMORY_NOT_ENOUGH.getStatusCode());
+      } else {
+        freeMemoryForOperators -= estimatedMemorySize;
+      }
+    }
+
+    stateMachine.addStateChangeListener(
+        newState -> {
+          if (newState == FragmentInstanceState.FLUSHING || newState.isDone()) {
+            synchronized (this) {
+              this.freeMemoryForOperators += estimatedMemorySize;
+            }
+          }
+        });
+  }
+
   private static class InstanceHolder {
 
     private InstanceHolder() {}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
new file mode 100644
index 0000000000..b57ee14dcf
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.SortOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryCollectOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQuerySortOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator.MAP_NODE_RETRAINED_SIZE;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class OperatorMemoryTest {
+
+  @Test
+  public void seriesScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      MeasurementPath measurementPath =
+          new MeasurementPath("root.SeriesScanOperatorTest.device0.sensor0", TSDataType.INT32);
+      Set<String> allSensors = Sets.newHashSet("sensor0");
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      SeriesScanOperator seriesScanOperator =
+          new SeriesScanOperator(
+              planNodeId,
+              measurementPath,
+              allSensors,
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              true);
+
+      assertEquals(
+          TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+          seriesScanOperator.calculateMaxPeekMemory());
+      assertEquals(
+          TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+          seriesScanOperator.calculateMaxReturnSize());
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void alignedSeriesScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      AlignedPath alignedPath =
+          new AlignedPath(
+              "root.AlignedSeriesScanOperatorTest.device0",
+              Arrays.asList("sensor0", "sensor1", "sensor2"));
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, AlignedSeriesScanOperator.class.getSimpleName());
+
+      AlignedSeriesScanOperator seriesScanOperator =
+          new AlignedSeriesScanOperator(
+              planNodeId,
+              alignedPath,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              true);
+
+      assertEquals(
+          4 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+          seriesScanOperator.calculateMaxPeekMemory());
+      assertEquals(
+          4 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+          seriesScanOperator.calculateMaxReturnSize());
+
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void exchangeOperatorTest() {
+    ExchangeOperator exchangeOperator = new ExchangeOperator(null, null, null);
+
+    assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, exchangeOperator.calculateMaxPeekMemory());
+    assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, exchangeOperator.calculateMaxReturnSize());
+  }
+
+  @Test
+  public void lastCacheScanOperatorTest() {
+    TsBlock tsBlock = Mockito.mock(TsBlock.class);
+    Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(1024L);
+    LastCacheScanOperator lastCacheScanOperator = new LastCacheScanOperator(null, null, tsBlock);
+
+    assertEquals(1024, lastCacheScanOperator.calculateMaxPeekMemory());
+    assertEquals(1024, lastCacheScanOperator.calculateMaxReturnSize());
+  }
+
+  @Test
+  public void fillOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+
+    FillOperator fillOperator =
+        new FillOperator(Mockito.mock(OperatorContext.class), new IFill[] {null, null}, child);
+
+    assertEquals(2048 * 2, fillOperator.calculateMaxPeekMemory());
+    assertEquals(1024, fillOperator.calculateMaxReturnSize());
+  }
+
+  @Test
+  public void lastQueryCollectOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+    Random random = new Random();
+    long expectedMaxPeekMemory = 0;
+    long expectedMaxReturnSize = 0;
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      long currentMaxPeekMemory = random.nextInt(1024) + 1024;
+      long currentMaxReturnSize = random.nextInt(1024);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(currentMaxPeekMemory);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(currentMaxReturnSize);
+      children.add(child);
+      expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, currentMaxPeekMemory);
+      expectedMaxReturnSize = Math.max(expectedMaxReturnSize, currentMaxReturnSize);
+    }
+    LastQueryCollectOperator lastQueryCollectOperator =
+        new LastQueryCollectOperator(Mockito.mock(OperatorContext.class), children);
+
+    assertEquals(expectedMaxPeekMemory, lastQueryCollectOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, lastQueryCollectOperator.calculateMaxReturnSize());
+  }
+
+  @Test
+  public void lastQueryMergeOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+    Random random = new Random();
+    long expectedMaxPeekMemory = 0;
+    long expectedMaxReturnSize = 0;
+    long childSumReturnSize = 0;
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      long currentMaxPeekMemory = random.nextInt(1024) + 1024;
+      long currentMaxReturnSize = random.nextInt(1024);
+      childSumReturnSize += currentMaxReturnSize;
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(currentMaxPeekMemory);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(currentMaxReturnSize);
+      children.add(child);
+      expectedMaxReturnSize = Math.max(expectedMaxReturnSize, currentMaxReturnSize);
+      expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, currentMaxPeekMemory);
+    }
+    // we need to cache all the TsBlocks of children and then return a new TsBlock as result whose
+    // max possible should be equal to max return size among all its children and then we should
+    // also take TreeMap memory into account.
+    expectedMaxPeekMemory =
+        Math.max(
+            expectedMaxPeekMemory,
+            childSumReturnSize
+                + expectedMaxReturnSize
+                + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+                    * MAP_NODE_RETRAINED_SIZE);
+
+    LastQueryMergeOperator lastQueryMergeOperator =
+        new LastQueryMergeOperator(
+            Mockito.mock(OperatorContext.class), children, Comparator.naturalOrder());
+
+    assertEquals(expectedMaxPeekMemory, lastQueryMergeOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, lastQueryMergeOperator.calculateMaxReturnSize());
+  }
+
+  @Test
+  public void lastQueryOperatorTest() {
+    TsBlockBuilder builder = Mockito.mock(TsBlockBuilder.class);
+    Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(1024L);
+    List<UpdateLastCacheOperator> children = new ArrayList<>(4);
+    long expectedMaxPeekMemory = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    long expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    for (int i = 0; i < 4; i++) {
+      UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+      children.add(child);
+      expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, 2 * 1024 * 1024L);
+      expectedMaxReturnSize = Math.max(expectedMaxReturnSize, 1024L);
+    }
+    LastQueryOperator lastQueryOperator =
+        new LastQueryOperator(Mockito.mock(OperatorContext.class), children, builder);
+
+    assertEquals(expectedMaxPeekMemory, lastQueryOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, lastQueryOperator.calculateMaxReturnSize());
+
+    Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(4 * 1024 * 1024L);
+    assertEquals(4 * 1024 * 1024L, lastQueryOperator.calculateMaxPeekMemory());
+    assertEquals(4 * 1024 * 1024L, lastQueryOperator.calculateMaxReturnSize());
+  }
+
+  @Test
+  public void lastQuerySortOperatorTest() {
+    TsBlock tsBlock = Mockito.mock(TsBlock.class);
+    Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(16 * 1024L);
+    Mockito.when(tsBlock.getPositionCount()).thenReturn(16);
+    List<UpdateLastCacheOperator> children = new ArrayList<>(4);
+    long expectedMaxPeekMemory =
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + tsBlock.getRetainedSizeInBytes();
+    for (int i = 0; i < 4; i++) {
+      UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+      children.add(child);
+    }
+
+    expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory + 1024, 2 * 1024);
+
+    LastQuerySortOperator lastQuerySortOperator =
+        new LastQuerySortOperator(
+            Mockito.mock(OperatorContext.class), tsBlock, children, Comparator.naturalOrder());
+
+    assertEquals(expectedMaxPeekMemory, lastQuerySortOperator.calculateMaxPeekMemory());
+    assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, lastQuerySortOperator.calculateMaxReturnSize());
+  }
+
+  @Test
+  public void limitOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+
+    LimitOperator limitOperator =
+        new LimitOperator(Mockito.mock(OperatorContext.class), 100, child);
+
+    assertEquals(2 * 1024L, limitOperator.calculateMaxPeekMemory());
+    assertEquals(1024, limitOperator.calculateMaxReturnSize());
+  }
+
+  @Test
+  public void offsetOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+
+    OffsetOperator offsetOperator =
+        new OffsetOperator(Mockito.mock(OperatorContext.class), 100, child);
+
+    assertEquals(2 * 1024L, offsetOperator.calculateMaxPeekMemory());
+    assertEquals(1024, offsetOperator.calculateMaxReturnSize());
+  }
+
+  @Test
+  public void rowBasedTimeJoinOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+    List<TSDataType> dataTypeList = new ArrayList<>(2);
+    dataTypeList.add(TSDataType.INT32);
+    dataTypeList.add(TSDataType.INT32);
+    long expectedMaxReturnSize =
+        3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    long expectedMaxPeekMemory = expectedMaxReturnSize;
+    long childrenMaxPeekMemory = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      expectedMaxPeekMemory += 64 * 1024L;
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+      children.add(child);
+    }
+
+    expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+
+    RowBasedTimeJoinOperator rowBasedTimeJoinOperator =
+        new RowBasedTimeJoinOperator(
+            Mockito.mock(OperatorContext.class), children, Ordering.ASC, dataTypeList, null, null);
+
+    assertEquals(expectedMaxPeekMemory, rowBasedTimeJoinOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, rowBasedTimeJoinOperator.calculateMaxReturnSize());
+  }
+
+  @Test
+  public void sortOperatorTest() {
+    SortOperator sortOperator = new SortOperator();
+    assertEquals(0, sortOperator.calculateMaxPeekMemory());
+    assertEquals(0, sortOperator.calculateMaxReturnSize());
+  }
+
+  @Test
+  public void timeJoinOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+    List<TSDataType> dataTypeList = new ArrayList<>(2);
+    dataTypeList.add(TSDataType.INT32);
+    dataTypeList.add(TSDataType.INT32);
+    long expectedMaxReturnSize =
+        3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    long expectedMaxPeekMemory = expectedMaxReturnSize;
+    long childrenMaxPeekMemory = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      expectedMaxPeekMemory += 64 * 1024L;
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+      children.add(child);
+    }
+
+    expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+
+    TimeJoinOperator timeJoinOperator =
+        new TimeJoinOperator(
+            Mockito.mock(OperatorContext.class), children, Ordering.ASC, dataTypeList, null, null);
+
+    assertEquals(expectedMaxPeekMemory, timeJoinOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, timeJoinOperator.calculateMaxReturnSize());
+  }
+
+  @Test
+  public void updateLastCacheOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+
+    UpdateLastCacheOperator updateLastCacheOperator =
+        new UpdateLastCacheOperator(null, child, null, TSDataType.BOOLEAN, null, true);
+
+    assertEquals(2048, updateLastCacheOperator.calculateMaxPeekMemory());
+    assertEquals(1024, updateLastCacheOperator.calculateMaxReturnSize());
+  }
+
+  @Test
+  public void linearFillOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+
+    LinearFillOperator linearFillOperator =
+        new LinearFillOperator(
+            Mockito.mock(OperatorContext.class), new LinearFill[] {null, null}, child);
+
+    assertEquals(2048 * 3, linearFillOperator.calculateMaxPeekMemory());
+    assertEquals(1024, linearFillOperator.calculateMaxReturnSize());
+  }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index b455746b09..291cf0c46d 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -92,6 +92,8 @@ public enum TSStatusCode {
   UNSUPPORTED_INDEX_FUNC_ERROR(421),
   UNSUPPORTED_INDEX_TYPE_ERROR(422),
 
+  MEMORY_NOT_ENOUGH(423),
+
   INTERNAL_SERVER_ERROR(500),
   CLOSE_OPERATION_ERROR(501),
   READ_ONLY_SYSTEM_ERROR(502),