You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/11 14:20:12 UTC
[iotdb] branch MemoryControl updated: Memory control logic for some operators (#6964)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/MemoryControl by this push:
new bb9d2ff8c7 Memory control logic for some operators (#6964)
bb9d2ff8c7 is described below
commit bb9d2ff8c757325a4f0cd5b112f4c3672e9a682e
Author: Liao Lanyu <10...@users.noreply.github.com>
AuthorDate: Thu Aug 11 22:20:06 2022 +0800
Memory control logic for some operators (#6964)
---
.../operator/process/DeviceMergeOperator.java | 34 +++++
.../operator/process/DeviceViewOperator.java | 28 ++++
.../operator/process/FilterAndProjectOperator.java | 108 +++++++++++++++
.../column/multi/MappableUDFColumnTransformer.java | 4 +
.../column/ternary/TernaryColumnTransformer.java | 12 ++
.../dag/column/unary/UnaryColumnTransformer.java | 4 +
.../mpp/execution/operator/OperatorMemoryTest.java | 152 +++++++++++++++++++++
7 files changed, 342 insertions(+)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index e0ccbe86ba..511a75991b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
@@ -278,4 +279,37 @@ public class DeviceMergeOperator implements ProcessOperator {
return inputTsBlocks[tsBlockIndex] == null
|| inputTsBlocks[tsBlockIndex].getPositionCount() == 0;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ // timeSelector will cache time, we use a single time column to represent max memory cost
+ long maxPeekMemory = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ // inputTsBlocks will cache all TsBlocks returned by deviceOperators
+ for (Operator operator : deviceOperators) {
+ maxPeekMemory += operator.calculateMaxReturnSize();
+ maxPeekMemory += operator.calculateRetainedSizeAfterCallingNext();
+ }
+ for (Operator operator : deviceOperators) {
+ maxPeekMemory = Math.max(maxPeekMemory, operator.calculateMaxPeekMemory());
+ }
+ return Math.max(maxPeekMemory, calculateMaxReturnSize());
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ // time + all value columns
+ return (1L + dataTypes.size()) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+ for (Operator child : deviceOperators) {
+ long maxReturnSize = child.calculateMaxReturnSize();
+ currentRetainedSize += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+ minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+ }
+ // max cached TsBlock
+ return currentRetainedSize - minChildReturnSize;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index 127a26a8b6..728da7e7db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
@@ -152,4 +153,31 @@ public class DeviceViewOperator implements ProcessOperator {
public boolean isFinished() {
return !this.hasNext();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = calculateMaxReturnSize() + calculateRetainedSizeAfterCallingNext();
+ for (Operator child : deviceOperators) {
+ maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+ }
+ return maxPeekMemory;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ // null columns would be filled, so return size equals to
+ // (numberOfValueColumns(dataTypes.size() - 1) + 1(timeColumn)) * columnSize + deviceColumnSize
+ // size of device name column is ignored
+ return (long) (dataTypes.size())
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long sum = 0;
+ for (Operator operator : deviceOperators) {
+ sum += operator.calculateRetainedSizeAfterCallingNext();
+ }
+ return sum;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
index 60dc07b2f2..80b75a40e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
@@ -22,7 +22,14 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.BinaryColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.multi.MappableUDFColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ternary.TernaryColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.unary.UnaryColumnTransformer;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -199,4 +206,105 @@ public class FilterAndProjectOperator implements ProcessOperator {
public ListenableFuture<?> isBlocked() {
return inputOperator.isBlocked();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = inputOperator.calculateMaxReturnSize();
+ int maxCachedColumn = 0;
+ // Only do projection, calculate max cached column size of calc tree
+ if (!hasFilter) {
+ for (int i = 0; i < projectOutputTransformerList.size(); i++) {
+ ColumnTransformer c = projectOutputTransformerList.get(i);
+ maxCachedColumn = Math.max(maxCachedColumn, 1 + i + getMaxLevelOfColumnTransformerTree(c));
+ }
+ return Math.max(
+ maxPeekMemory,
+ (long) maxCachedColumn
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte())
+ + inputOperator.calculateRetainedSizeAfterCallingNext();
+ }
+
+ // has Filter
+ maxCachedColumn =
+ Math.max(
+ 1 + getMaxLevelOfColumnTransformerTree(filterOutputTransformer),
+ 1 + commonTransformerList.size());
+ if (!hasNonMappableUDF) {
+ for (int i = 0; i < projectOutputTransformerList.size(); i++) {
+ ColumnTransformer c = projectOutputTransformerList.get(i);
+ maxCachedColumn = Math.max(maxCachedColumn, 1 + i + getMaxLevelOfColumnTransformerTree(c));
+ }
+ }
+ return Math.max(
+ maxPeekMemory,
+ (long) maxCachedColumn * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte())
+ + inputOperator.calculateRetainedSizeAfterCallingNext();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ // time + all value columns
+ if (!hasFilter || !hasNonMappableUDF) {
+ return (long) (1 + projectOutputTransformerList.size())
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ } else {
+ return (long) (1 + filterTsBlockBuilder.getValueColumnBuilders().length)
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ }
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return inputOperator.calculateRetainedSizeAfterCallingNext();
+ }
+
+ private int getMaxLevelOfColumnTransformerTree(ColumnTransformer columnTransformer) {
+ if (columnTransformer instanceof LeafColumnTransformer) {
+ // Time column is always calculated, we ignore it here. Constant column is ignored.
+ if (columnTransformer instanceof ConstantColumnTransformer
+ || columnTransformer instanceof TimeColumnTransformer) {
+ return 0;
+ } else {
+ return 1;
+ }
+ } else if (columnTransformer instanceof UnaryColumnTransformer) {
+ return Math.max(
+ 2,
+ getMaxLevelOfColumnTransformerTree(
+ ((UnaryColumnTransformer) columnTransformer).getChildColumnTransformer()));
+ } else if (columnTransformer instanceof BinaryColumnTransformer) {
+ int childMaxLevel =
+ Math.max(
+ getMaxLevelOfColumnTransformerTree(
+ ((BinaryColumnTransformer) columnTransformer).getLeftTransformer()),
+ getMaxLevelOfColumnTransformerTree(
+ ((BinaryColumnTransformer) columnTransformer).getRightTransformer()));
+ return Math.max(3, childMaxLevel);
+ } else if (columnTransformer instanceof TernaryColumnTransformer) {
+ int childMaxLevel =
+ Math.max(
+ getMaxLevelOfColumnTransformerTree(
+ ((TernaryColumnTransformer) columnTransformer).getFirstColumnTransformer()),
+ Math.max(
+ getMaxLevelOfColumnTransformerTree(
+ ((TernaryColumnTransformer) columnTransformer).getSecondColumnTransformer()),
+ getMaxLevelOfColumnTransformerTree(
+ ((TernaryColumnTransformer) columnTransformer).getThirdColumnTransformer())));
+ return Math.max(4, childMaxLevel);
+ } else if (columnTransformer instanceof MappableUDFColumnTransformer) {
+ int childMaxLevel = 0;
+ for (ColumnTransformer c :
+ ((MappableUDFColumnTransformer) columnTransformer).getInputColumnTransformers()) {
+ childMaxLevel = Math.max(childMaxLevel, getMaxLevelOfColumnTransformerTree(c));
+ }
+ return Math.max(
+ 1
+ + ((MappableUDFColumnTransformer) columnTransformer)
+ .getInputColumnTransformers()
+ .length,
+ childMaxLevel);
+ } else {
+ throw new UnsupportedOperationException("Unsupported ColumnTransformer");
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java
index 87cecc817a..d9693aa671 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java
@@ -88,4 +88,8 @@ public class MappableUDFColumnTransformer extends ColumnTransformer {
protected void checkType() {
// do nothing
}
+
+ public ColumnTransformer[] getInputColumnTransformers() {
+ return inputColumnTransformers;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java
index 2032215b0f..096cef4d5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java
@@ -41,4 +41,16 @@ public abstract class TernaryColumnTransformer extends ColumnTransformer {
this.thirdColumnTransformer = thirdColumnTransformer;
checkType();
}
+
+ public ColumnTransformer getFirstColumnTransformer() {
+ return firstColumnTransformer;
+ }
+
+ public ColumnTransformer getSecondColumnTransformer() {
+ return secondColumnTransformer;
+ }
+
+ public ColumnTransformer getThirdColumnTransformer() {
+ return thirdColumnTransformer;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java
index d84b538cc6..c0682169b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java
@@ -42,6 +42,10 @@ public abstract class UnaryColumnTransformer extends ColumnTransformer {
initializeColumnCache(columnBuilder.build());
}
+ public ColumnTransformer getChildColumnTransformer() {
+ return childColumnTransformer;
+ }
+
@Override
protected void checkType() {
// do nothing
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index 349e98758b..c718942108 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -27,7 +27,10 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
@@ -36,6 +39,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryCollectOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
@@ -47,10 +51,20 @@ import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticAdditionColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareLessEqualColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.type.BooleanType;
+import org.apache.iotdb.tsfile.read.common.type.LongType;
+import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
import com.google.common.collect.Sets;
import org.junit.Test;
@@ -467,4 +481,142 @@ public class OperatorMemoryTest {
assertEquals(1024, linearFillOperator.calculateMaxReturnSize());
assertEquals(512, linearFillOperator.calculateRetainedSizeAfterCallingNext());
}
+
+ @Test
+ public void deviceMergeOperatorTest() {
+ List<Operator> children = new ArrayList<>(4);
+ List<TSDataType> dataTypeList = new ArrayList<>(2);
+ dataTypeList.add(TSDataType.INT32);
+ dataTypeList.add(TSDataType.INT32);
+ List<String> devices = new ArrayList<>(4);
+ devices.add("device1");
+ devices.add("device2");
+ devices.add("device3");
+ devices.add("device4");
+ long expectedMaxReturnSize =
+ 3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ long expectedMaxPeekMemory = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ long expectedRetainedSizeAfterCallingNext = 0;
+ long childrenMaxPeekMemory = 0;
+
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(64 * 1024L);
+ expectedMaxPeekMemory += 128 * 1024L;
+ childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ expectedRetainedSizeAfterCallingNext += 128 * 1024L;
+ children.add(child);
+ }
+
+ expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+
+ DeviceMergeOperator deviceMergeOperator =
+ new DeviceMergeOperator(
+ Mockito.mock(OperatorContext.class),
+ devices,
+ children,
+ dataTypeList,
+ Mockito.mock(TimeSelector.class),
+ Mockito.mock(TimeComparator.class));
+
+ assertEquals(expectedMaxPeekMemory, deviceMergeOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, deviceMergeOperator.calculateMaxReturnSize());
+ assertEquals(
+ expectedRetainedSizeAfterCallingNext - 64 * 1024L,
+ deviceMergeOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void deviceViewOperatorTest() {
+ List<Operator> children = new ArrayList<>(4);
+ List<TSDataType> dataTypeList = new ArrayList<>(2);
+ dataTypeList.add(TSDataType.INT32);
+ dataTypeList.add(TSDataType.INT32);
+ List<String> devices = new ArrayList<>(4);
+ devices.add("device1");
+ devices.add("device2");
+ devices.add("device3");
+ devices.add("device4");
+ long expectedMaxReturnSize =
+ 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ long expectedMaxPeekMemory = expectedMaxReturnSize;
+ long expectedRetainedSizeAfterCallingNext = 0;
+ long childrenMaxPeekMemory = 0;
+
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(1024L);
+ expectedMaxPeekMemory += 1024L;
+ childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ expectedRetainedSizeAfterCallingNext += 1024L;
+ children.add(child);
+ }
+
+ expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+
+ DeviceViewOperator deviceViewOperator =
+ new DeviceViewOperator(
+ Mockito.mock(OperatorContext.class),
+ devices,
+ children,
+ new ArrayList<>(),
+ dataTypeList);
+
+ assertEquals(expectedMaxPeekMemory, deviceViewOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, deviceViewOperator.calculateMaxReturnSize());
+ assertEquals(
+ expectedRetainedSizeAfterCallingNext,
+ deviceViewOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void filterAndProjectOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+ BooleanType booleanType = Mockito.mock(BooleanType.class);
+ Mockito.when(booleanType.getTypeEnum()).thenReturn(TypeEnum.BOOLEAN);
+ LongType longType = Mockito.mock(LongType.class);
+ Mockito.when(longType.getTypeEnum()).thenReturn(TypeEnum.INT64);
+ ColumnTransformer filterColumnTransformer =
+ new CompareLessEqualColumnTransformer(
+ booleanType,
+ new TimeColumnTransformer(longType),
+ new ConstantColumnTransformer(longType, Mockito.mock(IntColumn.class)));
+ List<TSDataType> filterOutputTypes = new ArrayList<>();
+ filterOutputTypes.add(TSDataType.INT32);
+ filterOutputTypes.add(TSDataType.INT64);
+ List<ColumnTransformer> projectColumnTransformers = new ArrayList<>();
+ projectColumnTransformers.add(
+ new ArithmeticAdditionColumnTransformer(
+ booleanType,
+ new TimeColumnTransformer(longType),
+ new ConstantColumnTransformer(longType, Mockito.mock(IntColumn.class))));
+
+ FilterAndProjectOperator operator =
+ new FilterAndProjectOperator(
+ Mockito.mock(OperatorContext.class),
+ child,
+ filterOutputTypes,
+ new ArrayList<>(),
+ filterColumnTransformer,
+ new ArrayList<>(),
+ new ArrayList<>(),
+ projectColumnTransformers,
+ false,
+ true);
+
+ assertEquals(
+ 4L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() + 512L,
+ operator.calculateMaxPeekMemory());
+ assertEquals(
+ 2 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+ operator.calculateMaxReturnSize());
+ assertEquals(512, operator.calculateRetainedSizeAfterCallingNext());
+ }
}