You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/11 14:20:12 UTC

[iotdb] branch MemoryControl updated: Memory control logic for some operators (#6964)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/MemoryControl by this push:
     new bb9d2ff8c7 Memory control logic for some operators (#6964)
bb9d2ff8c7 is described below

commit bb9d2ff8c757325a4f0cd5b112f4c3672e9a682e
Author: Liao Lanyu <10...@users.noreply.github.com>
AuthorDate: Thu Aug 11 22:20:06 2022 +0800

    Memory control logic for some operators (#6964)
---
 .../operator/process/DeviceMergeOperator.java      |  34 +++++
 .../operator/process/DeviceViewOperator.java       |  28 ++++
 .../operator/process/FilterAndProjectOperator.java | 108 +++++++++++++++
 .../column/multi/MappableUDFColumnTransformer.java |   4 +
 .../column/ternary/TernaryColumnTransformer.java   |  12 ++
 .../dag/column/unary/UnaryColumnTransformer.java   |   4 +
 .../mpp/execution/operator/OperatorMemoryTest.java | 152 +++++++++++++++++++++
 7 files changed, 342 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index e0ccbe86ba..511a75991b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
@@ -278,4 +279,37 @@ public class DeviceMergeOperator implements ProcessOperator {
     return inputTsBlocks[tsBlockIndex] == null
         || inputTsBlocks[tsBlockIndex].getPositionCount() == 0;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // timeSelector will cache time, we use a single time column to represent max memory cost
+    long maxPeekMemory = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    // inputTsBlocks will cache all TsBlocks returned by deviceOperators
+    for (Operator operator : deviceOperators) {
+      maxPeekMemory += operator.calculateMaxReturnSize();
+      maxPeekMemory += operator.calculateRetainedSizeAfterCallingNext();
+    }
+    for (Operator operator : deviceOperators) {
+      maxPeekMemory = Math.max(maxPeekMemory, operator.calculateMaxPeekMemory());
+    }
+    return Math.max(maxPeekMemory, calculateMaxReturnSize());
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // time + all value columns
+    return (1L + dataTypes.size()) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+    for (Operator child : deviceOperators) {
+      long maxReturnSize = child.calculateMaxReturnSize();
+      currentRetainedSize += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+      minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+    }
+    // max cached TsBlock
+    return currentRetainedSize - minChildReturnSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index 127a26a8b6..728da7e7db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
@@ -152,4 +153,31 @@ public class DeviceViewOperator implements ProcessOperator {
   public boolean isFinished() {
     return !this.hasNext();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = calculateMaxReturnSize() + calculateRetainedSizeAfterCallingNext();
+    for (Operator child : deviceOperators) {
+      maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+    }
+    return maxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // null columns would be filled, so return size equals to
+    // (numberOfValueColumns(dataTypes.size() - 1) + 1(timeColumn)) * columnSize + deviceColumnSize
+    // size of device name column is ignored
+    return (long) (dataTypes.size())
+        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long sum = 0;
+    for (Operator operator : deviceOperators) {
+      sum += operator.calculateRetainedSizeAfterCallingNext();
+    }
+    return sum;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
index 60dc07b2f2..80b75a40e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
@@ -22,7 +22,14 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.BinaryColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.multi.MappableUDFColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ternary.TernaryColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.unary.UnaryColumnTransformer;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -199,4 +206,105 @@ public class FilterAndProjectOperator implements ProcessOperator {
   public ListenableFuture<?> isBlocked() {
     return inputOperator.isBlocked();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = inputOperator.calculateMaxReturnSize();
+    int maxCachedColumn = 0;
+    // Only do projection, calculate max cached column size of calc tree
+    if (!hasFilter) {
+      for (int i = 0; i < projectOutputTransformerList.size(); i++) {
+        ColumnTransformer c = projectOutputTransformerList.get(i);
+        maxCachedColumn = Math.max(maxCachedColumn, 1 + i + getMaxLevelOfColumnTransformerTree(c));
+      }
+      return Math.max(
+              maxPeekMemory,
+              (long) maxCachedColumn
+                  * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte())
+          + inputOperator.calculateRetainedSizeAfterCallingNext();
+    }
+
+    // has Filter
+    maxCachedColumn =
+        Math.max(
+            1 + getMaxLevelOfColumnTransformerTree(filterOutputTransformer),
+            1 + commonTransformerList.size());
+    if (!hasNonMappableUDF) {
+      for (int i = 0; i < projectOutputTransformerList.size(); i++) {
+        ColumnTransformer c = projectOutputTransformerList.get(i);
+        maxCachedColumn = Math.max(maxCachedColumn, 1 + i + getMaxLevelOfColumnTransformerTree(c));
+      }
+    }
+    return Math.max(
+            maxPeekMemory,
+            (long) maxCachedColumn * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte())
+        + inputOperator.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // time + all value columns
+    if (!hasFilter || !hasNonMappableUDF) {
+      return (long) (1 + projectOutputTransformerList.size())
+          * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    } else {
+      return (long) (1 + filterTsBlockBuilder.getValueColumnBuilders().length)
+          * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    }
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return inputOperator.calculateRetainedSizeAfterCallingNext();
+  }
+
+  private int getMaxLevelOfColumnTransformerTree(ColumnTransformer columnTransformer) {
+    if (columnTransformer instanceof LeafColumnTransformer) {
+      // Time column is always calculated, we ignore it here. Constant column is ignored.
+      if (columnTransformer instanceof ConstantColumnTransformer
+          || columnTransformer instanceof TimeColumnTransformer) {
+        return 0;
+      } else {
+        return 1;
+      }
+    } else if (columnTransformer instanceof UnaryColumnTransformer) {
+      return Math.max(
+          2,
+          getMaxLevelOfColumnTransformerTree(
+              ((UnaryColumnTransformer) columnTransformer).getChildColumnTransformer()));
+    } else if (columnTransformer instanceof BinaryColumnTransformer) {
+      int childMaxLevel =
+          Math.max(
+              getMaxLevelOfColumnTransformerTree(
+                  ((BinaryColumnTransformer) columnTransformer).getLeftTransformer()),
+              getMaxLevelOfColumnTransformerTree(
+                  ((BinaryColumnTransformer) columnTransformer).getRightTransformer()));
+      return Math.max(3, childMaxLevel);
+    } else if (columnTransformer instanceof TernaryColumnTransformer) {
+      int childMaxLevel =
+          Math.max(
+              getMaxLevelOfColumnTransformerTree(
+                  ((TernaryColumnTransformer) columnTransformer).getFirstColumnTransformer()),
+              Math.max(
+                  getMaxLevelOfColumnTransformerTree(
+                      ((TernaryColumnTransformer) columnTransformer).getSecondColumnTransformer()),
+                  getMaxLevelOfColumnTransformerTree(
+                      ((TernaryColumnTransformer) columnTransformer).getThirdColumnTransformer())));
+      return Math.max(4, childMaxLevel);
+    } else if (columnTransformer instanceof MappableUDFColumnTransformer) {
+      int childMaxLevel = 0;
+      for (ColumnTransformer c :
+          ((MappableUDFColumnTransformer) columnTransformer).getInputColumnTransformers()) {
+        childMaxLevel = Math.max(childMaxLevel, getMaxLevelOfColumnTransformerTree(c));
+      }
+      return Math.max(
+          1
+              + ((MappableUDFColumnTransformer) columnTransformer)
+                  .getInputColumnTransformers()
+                  .length,
+          childMaxLevel);
+    } else {
+      throw new UnsupportedOperationException("Unsupported ColumnTransformer");
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java
index 87cecc817a..d9693aa671 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java
@@ -88,4 +88,8 @@ public class MappableUDFColumnTransformer extends ColumnTransformer {
   protected void checkType() {
     // do nothing
   }
+
+  public ColumnTransformer[] getInputColumnTransformers() {
+    return inputColumnTransformers;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java
index 2032215b0f..096cef4d5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java
@@ -41,4 +41,16 @@ public abstract class TernaryColumnTransformer extends ColumnTransformer {
     this.thirdColumnTransformer = thirdColumnTransformer;
     checkType();
   }
+
+  public ColumnTransformer getFirstColumnTransformer() {
+    return firstColumnTransformer;
+  }
+
+  public ColumnTransformer getSecondColumnTransformer() {
+    return secondColumnTransformer;
+  }
+
+  public ColumnTransformer getThirdColumnTransformer() {
+    return thirdColumnTransformer;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java
index d84b538cc6..c0682169b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java
@@ -42,6 +42,10 @@ public abstract class UnaryColumnTransformer extends ColumnTransformer {
     initializeColumnCache(columnBuilder.build());
   }
 
+  public ColumnTransformer getChildColumnTransformer() {
+    return childColumnTransformer;
+  }
+
   @Override
   protected void checkType() {
     // do nothing
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index 349e98758b..c718942108 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -27,7 +27,10 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
@@ -36,6 +39,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryCollectOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
@@ -47,10 +51,20 @@ import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticAdditionColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareLessEqualColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.type.BooleanType;
+import org.apache.iotdb.tsfile.read.common.type.LongType;
+import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
 
 import com.google.common.collect.Sets;
 import org.junit.Test;
@@ -467,4 +481,142 @@ public class OperatorMemoryTest {
     assertEquals(1024, linearFillOperator.calculateMaxReturnSize());
     assertEquals(512, linearFillOperator.calculateRetainedSizeAfterCallingNext());
   }
+
+  @Test
+  public void deviceMergeOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+    List<TSDataType> dataTypeList = new ArrayList<>(2);
+    dataTypeList.add(TSDataType.INT32);
+    dataTypeList.add(TSDataType.INT32);
+    List<String> devices = new ArrayList<>(4);
+    devices.add("device1");
+    devices.add("device2");
+    devices.add("device3");
+    devices.add("device4");
+    long expectedMaxReturnSize =
+        3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    long expectedMaxPeekMemory = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    long expectedRetainedSizeAfterCallingNext = 0;
+    long childrenMaxPeekMemory = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(64 * 1024L);
+      expectedMaxPeekMemory += 128 * 1024L;
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+      expectedRetainedSizeAfterCallingNext += 128 * 1024L;
+      children.add(child);
+    }
+
+    expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+
+    DeviceMergeOperator deviceMergeOperator =
+        new DeviceMergeOperator(
+            Mockito.mock(OperatorContext.class),
+            devices,
+            children,
+            dataTypeList,
+            Mockito.mock(TimeSelector.class),
+            Mockito.mock(TimeComparator.class));
+
+    assertEquals(expectedMaxPeekMemory, deviceMergeOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, deviceMergeOperator.calculateMaxReturnSize());
+    assertEquals(
+        expectedRetainedSizeAfterCallingNext - 64 * 1024L,
+        deviceMergeOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void deviceViewOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+    List<TSDataType> dataTypeList = new ArrayList<>(2);
+    dataTypeList.add(TSDataType.INT32);
+    dataTypeList.add(TSDataType.INT32);
+    List<String> devices = new ArrayList<>(4);
+    devices.add("device1");
+    devices.add("device2");
+    devices.add("device3");
+    devices.add("device4");
+    long expectedMaxReturnSize =
+        2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    long expectedMaxPeekMemory = expectedMaxReturnSize;
+    long expectedRetainedSizeAfterCallingNext = 0;
+    long childrenMaxPeekMemory = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(1024L);
+      expectedMaxPeekMemory += 1024L;
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+      expectedRetainedSizeAfterCallingNext += 1024L;
+      children.add(child);
+    }
+
+    expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+
+    DeviceViewOperator deviceViewOperator =
+        new DeviceViewOperator(
+            Mockito.mock(OperatorContext.class),
+            devices,
+            children,
+            new ArrayList<>(),
+            dataTypeList);
+
+    assertEquals(expectedMaxPeekMemory, deviceViewOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, deviceViewOperator.calculateMaxReturnSize());
+    assertEquals(
+        expectedRetainedSizeAfterCallingNext,
+        deviceViewOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void filterAndProjectOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+    BooleanType booleanType = Mockito.mock(BooleanType.class);
+    Mockito.when(booleanType.getTypeEnum()).thenReturn(TypeEnum.BOOLEAN);
+    LongType longType = Mockito.mock(LongType.class);
+    Mockito.when(longType.getTypeEnum()).thenReturn(TypeEnum.INT64);
+    ColumnTransformer filterColumnTransformer =
+        new CompareLessEqualColumnTransformer(
+            booleanType,
+            new TimeColumnTransformer(longType),
+            new ConstantColumnTransformer(longType, Mockito.mock(IntColumn.class)));
+    List<TSDataType> filterOutputTypes = new ArrayList<>();
+    filterOutputTypes.add(TSDataType.INT32);
+    filterOutputTypes.add(TSDataType.INT64);
+    List<ColumnTransformer> projectColumnTransformers = new ArrayList<>();
+    projectColumnTransformers.add(
+        new ArithmeticAdditionColumnTransformer(
+            booleanType,
+            new TimeColumnTransformer(longType),
+            new ConstantColumnTransformer(longType, Mockito.mock(IntColumn.class))));
+
+    FilterAndProjectOperator operator =
+        new FilterAndProjectOperator(
+            Mockito.mock(OperatorContext.class),
+            child,
+            filterOutputTypes,
+            new ArrayList<>(),
+            filterColumnTransformer,
+            new ArrayList<>(),
+            new ArrayList<>(),
+            projectColumnTransformers,
+            false,
+            true);
+
+    assertEquals(
+        4L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() + 512L,
+        operator.calculateMaxPeekMemory());
+    assertEquals(
+        2 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+        operator.calculateMaxReturnSize());
+    assertEquals(512, operator.calculateRetainedSizeAfterCallingNext());
+  }
 }