You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/12 02:11:46 UTC

[iotdb] branch master updated: [IOTDB-5763] Optimize the memory estimate for INTO operations

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 40f00c6cbb [IOTDB-5763] Optimize the memory estimate for INTO operations
40f00c6cbb is described below

commit 40f00c6cbb8a03cfa77477f3f8763a1e64a422ae
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Wed Apr 12 10:11:40 2023 +0800

    [IOTDB-5763] Optimize the memory estimate for INTO operations
---
 docs/UserGuide/Reference/Common-Config-Manual.md   | 10 +++
 .../zh/UserGuide/Reference/Common-Config-Manual.md |  9 +++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 15 ++++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 12 ++++
 .../operator/process/AbstractIntoOperator.java     | 64 ++++++++++++------
 .../operator/process/DeviceViewIntoOperator.java   |  7 +-
 .../execution/operator/process/IntoOperator.java   |  7 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 14 +---
 .../mpp/execution/operator/OperatorMemoryTest.java | 77 ++++++++++++++++++++++
 9 files changed, 177 insertions(+), 38 deletions(-)

diff --git a/docs/UserGuide/Reference/Common-Config-Manual.md b/docs/UserGuide/Reference/Common-Config-Manual.md
index e236c62c13..9b0ad80327 100644
--- a/docs/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/UserGuide/Reference/Common-Config-Manual.md
@@ -1401,6 +1401,16 @@ Different configuration parameters take effect in the following three ways:
 
 ### SELECT-INTO
 
+* into\_operation\_buffer\_size\_in\_byte
+
+|    Name     | into\_operation\_buffer\_size\_in\_byte                                                                                            |
+| :---------: | :---------------------------------------------------------------------------------------------------------------------------------- |
+| Description | When the select-into statement is executed, the maximum memory occupied by the data to be written (unit: Byte) |
+|    Type     | int64                                                        |
+|   Default   | 100MB                                                        |
+|  Effective  | hot-load                                                      |
+
+
 * select\_into\_insert\_tablet\_plan\_row\_limit
 
 |    Name     | select\_into\_insert\_tablet\_plan\_row\_limit                                                                                            |
diff --git a/docs/zh/UserGuide/Reference/Common-Config-Manual.md b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
index f9189ba509..c7b7315b24 100644
--- a/docs/zh/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
@@ -1439,6 +1439,15 @@ IoTDB ConfigNode 和 DataNode 的公共配置参数位于 `conf` 目录下。
 
 #### SELECT-INTO配置
 
+* into\_operation\_buffer\_size\_in\_byte
+
+|     名字     | into\_operation\_buffer\_size\_in\_byte                              |
+| :----------: | :-------------------------------------------------------------------- |
+|     描述     | 执行 select-into 语句时,待写入数据占用的最大内存(单位:Byte) |
+|     类型     | int64                                                        |
+|    默认值    | 100MB                                                        |
+| 改后生效方式 | 热加载                                                     |
+
 * select\_into\_insert\_tablet\_plan\_row\_limit
 
 |     名字     | select\_into\_insert\_tablet\_plan\_row\_limit                              |
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 79a4d198b8..53e7f321b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -683,6 +683,9 @@ public class IoTDBConfig {
    */
   private long continuousQueryMinimumEveryInterval = 1000;
 
+  /** How much memory may be used in ONE SELECT INTO operation (in Byte). */
+  private long intoOperationBufferSizeInByte = 100 * 1024 * 1024L;
+
   /**
    * The maximum number of rows can be processed in insert-tablet-plan when executing select-into
    * statements.
@@ -1919,14 +1922,22 @@ public class IoTDBConfig {
     this.continuousQueryMinimumEveryInterval = minimumEveryInterval;
   }
 
-  public void setSelectIntoInsertTabletPlanRowLimit(int selectIntoInsertTabletPlanRowLimit) {
-    this.selectIntoInsertTabletPlanRowLimit = selectIntoInsertTabletPlanRowLimit;
+  public long getIntoOperationBufferSizeInByte() {
+    return intoOperationBufferSizeInByte;
+  }
+
+  public void setIntoOperationBufferSizeInByte(long intoOperationBufferSizeInByte) {
+    this.intoOperationBufferSizeInByte = intoOperationBufferSizeInByte;
   }
 
   public int getSelectIntoInsertTabletPlanRowLimit() {
     return selectIntoInsertTabletPlanRowLimit;
   }
 
+  public void setSelectIntoInsertTabletPlanRowLimit(int selectIntoInsertTabletPlanRowLimit) {
+    this.selectIntoInsertTabletPlanRowLimit = selectIntoInsertTabletPlanRowLimit;
+  }
+
   public int getIntoOperationExecutionThreadCount() {
     return intoOperationExecutionThreadCount;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 49693dab20..432c9cbbca 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -912,6 +912,11 @@ public class IoTDBDescriptor {
     // mqtt
     loadMqttProps(properties);
 
+    conf.setIntoOperationBufferSizeInByte(
+        Long.parseLong(
+            properties.getProperty(
+                "into_operation_buffer_size_in_byte",
+                String.valueOf(conf.getIntoOperationBufferSizeInByte()))));
     conf.setSelectIntoInsertTabletPlanRowLimit(
         Integer.parseInt(
             properties.getProperty(
@@ -1474,6 +1479,13 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "merge_write_throughput_mb_per_sec",
                   Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+
+      // update select into operation max buffer size
+      conf.setIntoOperationBufferSizeInByte(
+          Long.parseLong(
+              properties.getProperty(
+                  "into_operation_buffer_size_in_byte",
+                  String.valueOf(conf.getIntoOperationBufferSizeInByte()))));
       // update insert-tablet-plan's row limit for select-into
       conf.setSelectIntoInsertTabletPlanRowLimit(
           Integer.parseInt(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 3d65c1659c..8816776d88 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.client.DataNodeInternalClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.IntoProcessException;
@@ -78,8 +79,9 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
   protected boolean finished = false;
 
-  private final long maxRetainedSize;
-  private final long maxReturnSize;
+  protected int maxRowNumberInStatement;
+  private long maxRetainedSize;
+  private long maxReturnSize;
 
   protected final List<Type> typeConvertors;
 
@@ -89,7 +91,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
       List<TSDataType> inputColumnTypes,
       Map<String, InputLocation> sourceColumnToInputLocationMap,
       ExecutorService intoOperationExecutor,
-      long maxStatementSize) {
+      long statementSizePerLine) {
     this.operatorContext = operatorContext;
     this.child = child;
     this.typeConvertors =
@@ -97,7 +99,23 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
     this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
     this.writeOperationExecutor = intoOperationExecutor;
+    initMemoryEstimates(statementSizePerLine);
+  }
+
+  private void initMemoryEstimates(long statementSizePerLine) {
+    long intoOperationBufferSizeInByte =
+        IoTDBDescriptor.getInstance().getConfig().getIntoOperationBufferSizeInByte();
+    long memAllowedMaxRowNumber = Math.max(intoOperationBufferSizeInByte / statementSizePerLine, 1);
+    if (memAllowedMaxRowNumber > Integer.MAX_VALUE) {
+      memAllowedMaxRowNumber = Integer.MAX_VALUE;
+    }
+    int maxRowNumberInStatement =
+        Math.min(
+            (int) memAllowedMaxRowNumber,
+            IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit());
+    long maxStatementSize = maxRowNumberInStatement * statementSizePerLine;
 
+    this.maxRowNumberInStatement = maxRowNumberInStatement;
     this.maxRetainedSize = child.calculateMaxReturnSize() + maxStatementSize;
     this.maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
@@ -209,7 +227,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
       Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
       Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
       Map<String, Boolean> targetDeviceToAlignedMap,
-      List<Type> sourceTypeConvertors) {
+      List<Type> sourceTypeConvertors,
+      int maxRowNumberInStatement) {
     List<InsertTabletStatementGenerator> insertTabletStatementGenerators =
         new ArrayList<>(targetPathToSourceInputLocationMap.size());
     for (Map.Entry<PartialPath, Map<String, InputLocation>> entry :
@@ -221,7 +240,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
               entry.getValue(),
               targetPathToDataTypeMap.get(targetDevice),
               targetDeviceToAlignedMap.get(targetDevice.toString()),
-              sourceTypeConvertors);
+              sourceTypeConvertors,
+              maxRowNumberInStatement);
       insertTabletStatementGenerators.add(generator);
     }
     return insertTabletStatementGenerators;
@@ -322,10 +342,14 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
   }
 
+  @TestOnly
+  public int getMaxRowNumberInStatement() {
+    return maxRowNumberInStatement;
+  }
+
   public static class InsertTabletStatementGenerator {
 
-    private final int TABLET_ROW_LIMIT =
-        IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
+    private final int rowLimit;
 
     private final PartialPath devicePath;
     private final boolean isAligned;
@@ -348,7 +372,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
         Map<String, InputLocation> measurementToInputLocationMap,
         Map<String, TSDataType> measurementToDataTypeMap,
         Boolean isAligned,
-        List<Type> sourceTypeConvertors) {
+        List<Type> sourceTypeConvertors,
+        int rowLimit) {
       this.devicePath = devicePath;
       this.isAligned = isAligned;
       this.measurements = measurementToInputLocationMap.keySet().toArray(new String[0]);
@@ -359,32 +384,33 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
         writtenCounter.put(measurement, new AtomicInteger(0));
       }
       this.sourceTypeConvertors = sourceTypeConvertors;
+      this.rowLimit = rowLimit;
       this.reset();
     }
 
     public void reset() {
       this.rowCount = 0;
-      this.times = new long[TABLET_ROW_LIMIT];
+      this.times = new long[rowLimit];
       this.columns = new Object[this.measurements.length];
       for (int i = 0; i < this.measurements.length; i++) {
         switch (dataTypes[i]) {
           case BOOLEAN:
-            columns[i] = new boolean[TABLET_ROW_LIMIT];
+            columns[i] = new boolean[rowLimit];
             break;
           case INT32:
-            columns[i] = new int[TABLET_ROW_LIMIT];
+            columns[i] = new int[rowLimit];
             break;
           case INT64:
-            columns[i] = new long[TABLET_ROW_LIMIT];
+            columns[i] = new long[rowLimit];
             break;
           case FLOAT:
-            columns[i] = new float[TABLET_ROW_LIMIT];
+            columns[i] = new float[rowLimit];
             break;
           case DOUBLE:
-            columns[i] = new double[TABLET_ROW_LIMIT];
+            columns[i] = new double[rowLimit];
             break;
           case TEXT:
-            columns[i] = new Binary[TABLET_ROW_LIMIT];
+            columns[i] = new Binary[rowLimit];
             Arrays.fill((Binary[]) columns[i], Binary.EMPTY_VALUE);
             break;
           default:
@@ -394,7 +420,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
       }
       this.bitMaps = new BitMap[this.measurements.length];
       for (int i = 0; i < this.bitMaps.length; ++i) {
-        this.bitMaps[i] = new BitMap(TABLET_ROW_LIMIT);
+        this.bitMaps[i] = new BitMap(rowLimit);
         this.bitMaps[i].markAll();
       }
     }
@@ -452,7 +478,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
         ++rowCount;
         ++lastReadIndex;
-        if (rowCount == TABLET_ROW_LIMIT) {
+        if (rowCount == rowLimit) {
           break;
         }
       }
@@ -460,7 +486,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     }
 
     public boolean isFull() {
-      return rowCount == TABLET_ROW_LIMIT;
+      return rowCount == rowLimit;
     }
 
     public boolean isEmpty() {
@@ -475,7 +501,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
       insertTabletStatement.setDataTypes(dataTypes);
       insertTabletStatement.setRowCount(rowCount);
 
-      if (rowCount != TABLET_ROW_LIMIT) {
+      if (rowCount != rowLimit) {
         times = Arrays.copyOf(times, rowCount);
         for (int i = 0; i < columns.length; i++) {
           bitMaps[i] = bitMaps[i].getRegion(0, rowCount);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 0ddd7ccc31..5f2d202219 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -64,14 +64,14 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
       Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
       Map<String, InputLocation> sourceColumnToInputLocationMap,
       ExecutorService intoOperationExecutor,
-      long maxStatementSize) {
+      long statementSizePerLine) {
     super(
         operatorContext,
         child,
         inputColumnTypes,
         sourceColumnToInputLocationMap,
         intoOperationExecutor,
-        maxStatementSize);
+        statementSizePerLine);
     this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap;
     this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
     this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
@@ -148,7 +148,8 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
         targetPathToSourceInputLocationMap,
         targetPathToDataTypeMap,
         targetDeviceToAlignedMap,
-        typeConvertors);
+        typeConvertors,
+        maxRowNumberInStatement);
   }
 
   private void updateResultTsBlock() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 34a8edc88a..d5e65d262e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -52,21 +52,22 @@ public class IntoOperator extends AbstractIntoOperator {
       List<Pair<String, PartialPath>> sourceTargetPathPairList,
       Map<String, InputLocation> sourceColumnToInputLocationMap,
       ExecutorService intoOperationExecutor,
-      long maxStatementSize) {
+      long statementSizePerLine) {
     super(
         operatorContext,
         child,
         inputColumnTypes,
         sourceColumnToInputLocationMap,
         intoOperationExecutor,
-        maxStatementSize);
+        statementSizePerLine);
     this.sourceTargetPathPairList = sourceTargetPathPairList;
     insertTabletStatementGenerators =
         constructInsertTabletStatementGenerators(
             targetPathToSourceInputLocationMap,
             targetPathToDataTypeMap,
             targetDeviceToAlignedMap,
-            typeConvertors);
+            typeConvertors,
+            maxRowNumberInStatement);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1778c70f9c..4e02b1c231 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
@@ -1639,10 +1638,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
 
     Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap =
         intoPathDescriptor.getTargetPathToDataTypeMap();
-
-    int rowLimit =
-        IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
-    long maxStatementSize = calculateStatementSizePerLine(targetPathToDataTypeMap) * rowLimit;
+    long statementSizePerLine = calculateStatementSizePerLine(targetPathToDataTypeMap);
 
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
 
@@ -1656,7 +1652,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         intoPathDescriptor.getSourceTargetPathPairList(),
         sourceColumnToInputLocationMap,
         FragmentInstanceManager.getInstance().getIntoOperationExecutor(),
-        maxStatementSize);
+        statementSizePerLine);
   }
 
   @Override
@@ -1697,10 +1693,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
           calculateStatementSizePerLine(deviceToTargetPathDataTypeMap.get(sourceDevice));
     }
 
-    int rowLimit =
-        IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
-    long maxStatementSize = statementSizePerLine * rowLimit;
-
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new DeviceViewIntoOperator(
         operatorContext,
@@ -1712,7 +1704,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(),
         sourceColumnToInputLocationMap,
         FragmentInstanceManager.getInstance().getIntoOperationExecutor(),
-        maxStatementSize);
+        statementSizePerLine);
   }
 
   private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index d6343d3392..ec3fa4a9b9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.IntoOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
@@ -1422,4 +1423,80 @@ public class OperatorMemoryTest {
         expectedMaxRetainSize + expectedChildrenRetainedSize,
         aggregationOperator.calculateRetainedSizeAfterCallingNext());
   }
+
+  @Test
+  public void intoOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory())
+        .thenReturn((long) DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+    Mockito.when(child.calculateMaxReturnSize())
+        .thenReturn((long) DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+
+    long statementSizePerLine1 = 8 + 1000 * (4 + 8 + 4 + 8 + 1 + 512);
+    IntoOperator intoOperator1 = createIntoOperator(child, statementSizePerLine1);
+    int expectedMaxRowNumber = 195;
+    long expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine1;
+    assertEquals(expectedMaxRowNumber, intoOperator1.getMaxRowNumberInStatement());
+    assertEquals(
+        expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        intoOperator1.calculateMaxPeekMemory());
+    assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, intoOperator1.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        intoOperator1.calculateRetainedSizeAfterCallingNext());
+
+    long statementSizePerLine2 = 8 + 1000 * (4 + 8 + 4 + 8 + 1);
+    IntoOperator intoOperator2 = createIntoOperator(child, statementSizePerLine2);
+    expectedMaxRowNumber = 4192;
+    expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine2;
+    assertEquals(expectedMaxRowNumber, intoOperator2.getMaxRowNumberInStatement());
+    assertEquals(
+        expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        intoOperator2.calculateMaxPeekMemory());
+    assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, intoOperator2.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        intoOperator2.calculateRetainedSizeAfterCallingNext());
+
+    long statementSizePerLine3 = 8 + 100 * (4 + 8 + 4 + 8 + 1);
+    IntoOperator intoOperator3 = createIntoOperator(child, statementSizePerLine3);
+    expectedMaxRowNumber = 10000;
+    expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine3;
+    assertEquals(expectedMaxRowNumber, intoOperator3.getMaxRowNumberInStatement());
+    assertEquals(
+        expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        intoOperator3.calculateMaxPeekMemory());
+    assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, intoOperator3.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        intoOperator3.calculateRetainedSizeAfterCallingNext());
+
+    long statementSizePerLine4 = 8 + 1000000 * (4 + 8 + 4 + 8 + 1 + 512);
+    IntoOperator intoOperator4 = createIntoOperator(child, statementSizePerLine4);
+    expectedMaxRowNumber = 1;
+    expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine4;
+    assertEquals(expectedMaxRowNumber, intoOperator4.getMaxRowNumberInStatement());
+    assertEquals(
+        expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        intoOperator4.calculateMaxPeekMemory());
+    assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, intoOperator4.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        intoOperator4.calculateRetainedSizeAfterCallingNext());
+  }
+
+  private IntoOperator createIntoOperator(Operator child, long statementSizePerLine) {
+    return new IntoOperator(
+        Mockito.mock(OperatorContext.class),
+        child,
+        Collections.emptyList(),
+        Collections.emptyMap(),
+        Collections.emptyMap(),
+        Collections.emptyMap(),
+        Collections.emptyList(),
+        Collections.emptyMap(),
+        null,
+        statementSizePerLine);
+  }
 }