You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/04/12 13:59:31 UTC

[iotdb] branch lmh/IOTDB_5763_1.1 created (now 085080ff6e)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch lmh/IOTDB_5763_1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 085080ff6e [IOTDB-5763] Optimize the memory estimate for INTO operations

This branch includes the following new commits:

     new 085080ff6e [IOTDB-5763] Optimize the memory estimate for INTO operations

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-5763] Optimize the memory estimate for INTO operations

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/IOTDB_5763_1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 085080ff6efe3550b261a13e9826780538fc2d18
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Wed Apr 12 10:11:40 2023 +0800

    [IOTDB-5763] Optimize the memory estimate for INTO operations
    
    (cherry picked from commit 40f00c6cbb8a03cfa77477f3f8763a1e64a422ae)
---
 docs/UserGuide/Reference/Common-Config-Manual.md   | 10 +++
 .../zh/UserGuide/Reference/Common-Config-Manual.md |  9 +++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 15 ++++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 12 ++++
 .../operator/process/AbstractIntoOperator.java     | 64 ++++++++++++------
 .../operator/process/DeviceViewIntoOperator.java   |  7 +-
 .../execution/operator/process/IntoOperator.java   |  7 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 14 +---
 .../mpp/execution/operator/OperatorMemoryTest.java | 77 ++++++++++++++++++++++
 9 files changed, 177 insertions(+), 38 deletions(-)

diff --git a/docs/UserGuide/Reference/Common-Config-Manual.md b/docs/UserGuide/Reference/Common-Config-Manual.md
index 0c95f5a317..095213a237 100644
--- a/docs/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/UserGuide/Reference/Common-Config-Manual.md
@@ -1401,6 +1401,16 @@ Different configuration parameters take effect in the following three ways:
 
 ### SELECT-INTO
 
+* into\_operation\_buffer\_size\_in\_byte
+
+|    Name     | into\_operation\_buffer\_size\_in\_byte                                                                                            |
+| :---------: | :---------------------------------------------------------------------------------------------------------------------------------- |
+| Description | When the select-into statement is executed, the maximum memory occupied by the data to be written (unit: Byte) |
+|    Type     | int64                                                        |
+|   Default   | 100MB                                                        |
+|  Effective  | hot-load                                                      |
+
+
 * select\_into\_insert\_tablet\_plan\_row\_limit
 
 |    Name     | select\_into\_insert\_tablet\_plan\_row\_limit                                                                                            |
diff --git a/docs/zh/UserGuide/Reference/Common-Config-Manual.md b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
index 0068927b3c..98efd6cc05 100644
--- a/docs/zh/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
@@ -1439,6 +1439,15 @@ IoTDB ConfigNode 和 DataNode 的公共配置参数位于 `conf` 目录下。
 
 #### SELECT-INTO配置
 
+* into\_operation\_buffer\_size\_in\_byte
+
+|     名字     | into\_operation\_buffer\_size\_in\_byte                              |
+| :----------: | :-------------------------------------------------------------------- |
+|     描述     | 执行 select-into 语句时,待写入数据占用的最大内存(单位:Byte) |
+|     类型     | int64                                                        |
+|    默认值    | 100MB                                                        |
+| 改后生效方式 | 热加载                                                     |
+
 * select\_into\_insert\_tablet\_plan\_row\_limit
 
 |     名字     | select\_into\_insert\_tablet\_plan\_row\_limit                              |
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 24c7b2ee82..5650b54638 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -671,6 +671,9 @@ public class IoTDBConfig {
    */
   private long continuousQueryMinimumEveryInterval = 1000;
 
+  /** How much memory may be used in ONE SELECT INTO operation (in Byte). */
+  private long intoOperationBufferSizeInByte = 100 * 1024 * 1024L;
+
   /**
    * The maximum number of rows can be processed in insert-tablet-plan when executing select-into
    * statements.
@@ -1878,14 +1881,22 @@ public class IoTDBConfig {
     this.continuousQueryMinimumEveryInterval = minimumEveryInterval;
   }
 
-  public void setSelectIntoInsertTabletPlanRowLimit(int selectIntoInsertTabletPlanRowLimit) {
-    this.selectIntoInsertTabletPlanRowLimit = selectIntoInsertTabletPlanRowLimit;
+  public long getIntoOperationBufferSizeInByte() {
+    return intoOperationBufferSizeInByte;
+  }
+
+  public void setIntoOperationBufferSizeInByte(long intoOperationBufferSizeInByte) {
+    this.intoOperationBufferSizeInByte = intoOperationBufferSizeInByte;
   }
 
   public int getSelectIntoInsertTabletPlanRowLimit() {
     return selectIntoInsertTabletPlanRowLimit;
   }
 
+  public void setSelectIntoInsertTabletPlanRowLimit(int selectIntoInsertTabletPlanRowLimit) {
+    this.selectIntoInsertTabletPlanRowLimit = selectIntoInsertTabletPlanRowLimit;
+  }
+
   public int getIntoOperationExecutionThreadCount() {
     return intoOperationExecutionThreadCount;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 57ac532d57..0010633da9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -901,6 +901,11 @@ public class IoTDBDescriptor {
     // mqtt
     loadMqttProps(properties);
 
+    conf.setIntoOperationBufferSizeInByte(
+        Long.parseLong(
+            properties.getProperty(
+                "into_operation_buffer_size_in_byte",
+                String.valueOf(conf.getIntoOperationBufferSizeInByte()))));
     conf.setSelectIntoInsertTabletPlanRowLimit(
         Integer.parseInt(
             properties.getProperty(
@@ -1454,6 +1459,13 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "merge_write_throughput_mb_per_sec",
                   Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+
+      // update select into operation max buffer size
+      conf.setIntoOperationBufferSizeInByte(
+          Long.parseLong(
+              properties.getProperty(
+                  "into_operation_buffer_size_in_byte",
+                  String.valueOf(conf.getIntoOperationBufferSizeInByte()))));
       // update insert-tablet-plan's row limit for select-into
       conf.setSelectIntoInsertTabletPlanRowLimit(
           Integer.parseInt(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 3d65c1659c..8816776d88 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.client.DataNodeInternalClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.IntoProcessException;
@@ -78,8 +79,9 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
   protected boolean finished = false;
 
-  private final long maxRetainedSize;
-  private final long maxReturnSize;
+  protected int maxRowNumberInStatement;
+  private long maxRetainedSize;
+  private long maxReturnSize;
 
   protected final List<Type> typeConvertors;
 
@@ -89,7 +91,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
       List<TSDataType> inputColumnTypes,
       Map<String, InputLocation> sourceColumnToInputLocationMap,
       ExecutorService intoOperationExecutor,
-      long maxStatementSize) {
+      long statementSizePerLine) {
     this.operatorContext = operatorContext;
     this.child = child;
     this.typeConvertors =
@@ -97,7 +99,23 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
     this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
     this.writeOperationExecutor = intoOperationExecutor;
+    initMemoryEstimates(statementSizePerLine);
+  }
+
+  private void initMemoryEstimates(long statementSizePerLine) {
+    long intoOperationBufferSizeInByte =
+        IoTDBDescriptor.getInstance().getConfig().getIntoOperationBufferSizeInByte();
+    long memAllowedMaxRowNumber = Math.max(intoOperationBufferSizeInByte / statementSizePerLine, 1);
+    if (memAllowedMaxRowNumber > Integer.MAX_VALUE) {
+      memAllowedMaxRowNumber = Integer.MAX_VALUE;
+    }
+    int maxRowNumberInStatement =
+        Math.min(
+            (int) memAllowedMaxRowNumber,
+            IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit());
+    long maxStatementSize = maxRowNumberInStatement * statementSizePerLine;
 
+    this.maxRowNumberInStatement = maxRowNumberInStatement;
     this.maxRetainedSize = child.calculateMaxReturnSize() + maxStatementSize;
     this.maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
@@ -209,7 +227,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
       Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
       Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
       Map<String, Boolean> targetDeviceToAlignedMap,
-      List<Type> sourceTypeConvertors) {
+      List<Type> sourceTypeConvertors,
+      int maxRowNumberInStatement) {
     List<InsertTabletStatementGenerator> insertTabletStatementGenerators =
         new ArrayList<>(targetPathToSourceInputLocationMap.size());
     for (Map.Entry<PartialPath, Map<String, InputLocation>> entry :
@@ -221,7 +240,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
               entry.getValue(),
               targetPathToDataTypeMap.get(targetDevice),
               targetDeviceToAlignedMap.get(targetDevice.toString()),
-              sourceTypeConvertors);
+              sourceTypeConvertors,
+              maxRowNumberInStatement);
       insertTabletStatementGenerators.add(generator);
     }
     return insertTabletStatementGenerators;
@@ -322,10 +342,14 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
   }
 
+  @TestOnly
+  public int getMaxRowNumberInStatement() {
+    return maxRowNumberInStatement;
+  }
+
   public static class InsertTabletStatementGenerator {
 
-    private final int TABLET_ROW_LIMIT =
-        IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
+    private final int rowLimit;
 
     private final PartialPath devicePath;
     private final boolean isAligned;
@@ -348,7 +372,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
         Map<String, InputLocation> measurementToInputLocationMap,
         Map<String, TSDataType> measurementToDataTypeMap,
         Boolean isAligned,
-        List<Type> sourceTypeConvertors) {
+        List<Type> sourceTypeConvertors,
+        int rowLimit) {
       this.devicePath = devicePath;
       this.isAligned = isAligned;
       this.measurements = measurementToInputLocationMap.keySet().toArray(new String[0]);
@@ -359,32 +384,33 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
         writtenCounter.put(measurement, new AtomicInteger(0));
       }
       this.sourceTypeConvertors = sourceTypeConvertors;
+      this.rowLimit = rowLimit;
       this.reset();
     }
 
     public void reset() {
       this.rowCount = 0;
-      this.times = new long[TABLET_ROW_LIMIT];
+      this.times = new long[rowLimit];
       this.columns = new Object[this.measurements.length];
       for (int i = 0; i < this.measurements.length; i++) {
         switch (dataTypes[i]) {
           case BOOLEAN:
-            columns[i] = new boolean[TABLET_ROW_LIMIT];
+            columns[i] = new boolean[rowLimit];
             break;
           case INT32:
-            columns[i] = new int[TABLET_ROW_LIMIT];
+            columns[i] = new int[rowLimit];
             break;
           case INT64:
-            columns[i] = new long[TABLET_ROW_LIMIT];
+            columns[i] = new long[rowLimit];
             break;
           case FLOAT:
-            columns[i] = new float[TABLET_ROW_LIMIT];
+            columns[i] = new float[rowLimit];
             break;
           case DOUBLE:
-            columns[i] = new double[TABLET_ROW_LIMIT];
+            columns[i] = new double[rowLimit];
             break;
           case TEXT:
-            columns[i] = new Binary[TABLET_ROW_LIMIT];
+            columns[i] = new Binary[rowLimit];
             Arrays.fill((Binary[]) columns[i], Binary.EMPTY_VALUE);
             break;
           default:
@@ -394,7 +420,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
       }
       this.bitMaps = new BitMap[this.measurements.length];
       for (int i = 0; i < this.bitMaps.length; ++i) {
-        this.bitMaps[i] = new BitMap(TABLET_ROW_LIMIT);
+        this.bitMaps[i] = new BitMap(rowLimit);
         this.bitMaps[i].markAll();
       }
     }
@@ -452,7 +478,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
 
         ++rowCount;
         ++lastReadIndex;
-        if (rowCount == TABLET_ROW_LIMIT) {
+        if (rowCount == rowLimit) {
           break;
         }
       }
@@ -460,7 +486,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
     }
 
     public boolean isFull() {
-      return rowCount == TABLET_ROW_LIMIT;
+      return rowCount == rowLimit;
     }
 
     public boolean isEmpty() {
@@ -475,7 +501,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
       insertTabletStatement.setDataTypes(dataTypes);
       insertTabletStatement.setRowCount(rowCount);
 
-      if (rowCount != TABLET_ROW_LIMIT) {
+      if (rowCount != rowLimit) {
         times = Arrays.copyOf(times, rowCount);
         for (int i = 0; i < columns.length; i++) {
           bitMaps[i] = bitMaps[i].getRegion(0, rowCount);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 0ddd7ccc31..5f2d202219 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -64,14 +64,14 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
       Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
       Map<String, InputLocation> sourceColumnToInputLocationMap,
       ExecutorService intoOperationExecutor,
-      long maxStatementSize) {
+      long statementSizePerLine) {
     super(
         operatorContext,
         child,
         inputColumnTypes,
         sourceColumnToInputLocationMap,
         intoOperationExecutor,
-        maxStatementSize);
+        statementSizePerLine);
     this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap;
     this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
     this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
@@ -148,7 +148,8 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
         targetPathToSourceInputLocationMap,
         targetPathToDataTypeMap,
         targetDeviceToAlignedMap,
-        typeConvertors);
+        typeConvertors,
+        maxRowNumberInStatement);
   }
 
   private void updateResultTsBlock() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 34a8edc88a..d5e65d262e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -52,21 +52,22 @@ public class IntoOperator extends AbstractIntoOperator {
       List<Pair<String, PartialPath>> sourceTargetPathPairList,
       Map<String, InputLocation> sourceColumnToInputLocationMap,
       ExecutorService intoOperationExecutor,
-      long maxStatementSize) {
+      long statementSizePerLine) {
     super(
         operatorContext,
         child,
         inputColumnTypes,
         sourceColumnToInputLocationMap,
         intoOperationExecutor,
-        maxStatementSize);
+        statementSizePerLine);
     this.sourceTargetPathPairList = sourceTargetPathPairList;
     insertTabletStatementGenerators =
         constructInsertTabletStatementGenerators(
             targetPathToSourceInputLocationMap,
             targetPathToDataTypeMap,
             targetDeviceToAlignedMap,
-            typeConvertors);
+            typeConvertors,
+            maxRowNumberInStatement);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 110a7c2cfc..2742b4bce2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
@@ -1622,10 +1621,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
 
     Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap =
         intoPathDescriptor.getTargetPathToDataTypeMap();
-
-    int rowLimit =
-        IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
-    long maxStatementSize = calculateStatementSizePerLine(targetPathToDataTypeMap) * rowLimit;
+    long statementSizePerLine = calculateStatementSizePerLine(targetPathToDataTypeMap);
 
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
 
@@ -1639,7 +1635,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         intoPathDescriptor.getSourceTargetPathPairList(),
         sourceColumnToInputLocationMap,
         FragmentInstanceManager.getInstance().getIntoOperationExecutor(),
-        maxStatementSize);
+        statementSizePerLine);
   }
 
   @Override
@@ -1680,10 +1676,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
           calculateStatementSizePerLine(deviceToTargetPathDataTypeMap.get(sourceDevice));
     }
 
-    int rowLimit =
-        IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
-    long maxStatementSize = statementSizePerLine * rowLimit;
-
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new DeviceViewIntoOperator(
         operatorContext,
@@ -1695,7 +1687,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(),
         sourceColumnToInputLocationMap,
         FragmentInstanceManager.getInstance().getIntoOperationExecutor(),
-        maxStatementSize);
+        statementSizePerLine);
   }
 
   private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index d6343d3392..ec3fa4a9b9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.IntoOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
@@ -1422,4 +1423,80 @@ public class OperatorMemoryTest {
         expectedMaxRetainSize + expectedChildrenRetainedSize,
         aggregationOperator.calculateRetainedSizeAfterCallingNext());
   }
+
+  @Test
+  public void intoOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory())
+        .thenReturn((long) DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+    Mockito.when(child.calculateMaxReturnSize())
+        .thenReturn((long) DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+
+    long statementSizePerLine1 = 8 + 1000 * (4 + 8 + 4 + 8 + 1 + 512);
+    IntoOperator intoOperator1 = createIntoOperator(child, statementSizePerLine1);
+    int expectedMaxRowNumber = 195;
+    long expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine1;
+    assertEquals(expectedMaxRowNumber, intoOperator1.getMaxRowNumberInStatement());
+    assertEquals(
+        expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        intoOperator1.calculateMaxPeekMemory());
+    assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, intoOperator1.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        intoOperator1.calculateRetainedSizeAfterCallingNext());
+
+    long statementSizePerLine2 = 8 + 1000 * (4 + 8 + 4 + 8 + 1);
+    IntoOperator intoOperator2 = createIntoOperator(child, statementSizePerLine2);
+    expectedMaxRowNumber = 4192;
+    expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine2;
+    assertEquals(expectedMaxRowNumber, intoOperator2.getMaxRowNumberInStatement());
+    assertEquals(
+        expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        intoOperator2.calculateMaxPeekMemory());
+    assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, intoOperator2.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        intoOperator2.calculateRetainedSizeAfterCallingNext());
+
+    long statementSizePerLine3 = 8 + 100 * (4 + 8 + 4 + 8 + 1);
+    IntoOperator intoOperator3 = createIntoOperator(child, statementSizePerLine3);
+    expectedMaxRowNumber = 10000;
+    expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine3;
+    assertEquals(expectedMaxRowNumber, intoOperator3.getMaxRowNumberInStatement());
+    assertEquals(
+        expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        intoOperator3.calculateMaxPeekMemory());
+    assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, intoOperator3.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        intoOperator3.calculateRetainedSizeAfterCallingNext());
+
+    long statementSizePerLine4 = 8 + 1000000 * (4 + 8 + 4 + 8 + 1 + 512);
+    IntoOperator intoOperator4 = createIntoOperator(child, statementSizePerLine4);
+    expectedMaxRowNumber = 1;
+    expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine4;
+    assertEquals(expectedMaxRowNumber, intoOperator4.getMaxRowNumberInStatement());
+    assertEquals(
+        expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        intoOperator4.calculateMaxPeekMemory());
+    assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, intoOperator4.calculateMaxReturnSize());
+    assertEquals(
+        expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        intoOperator4.calculateRetainedSizeAfterCallingNext());
+  }
+
+  private IntoOperator createIntoOperator(Operator child, long statementSizePerLine) {
+    return new IntoOperator(
+        Mockito.mock(OperatorContext.class),
+        child,
+        Collections.emptyList(),
+        Collections.emptyMap(),
+        Collections.emptyMap(),
+        Collections.emptyMap(),
+        Collections.emptyList(),
+        Collections.emptyMap(),
+        null,
+        statementSizePerLine);
+  }
 }