You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/05/14 12:25:21 UTC

[iotdb] branch master updated: [IOTDB-5846] Optimize the memory estimate for HorizontallyConcatOperator (#9781)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 10d4eba1943 [IOTDB-5846] Optimize the memory estimate for HorizontallyConcatOperator (#9781)
10d4eba1943 is described below

commit 10d4eba19434513caa44e8e42c3fc1728dda85a1
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Sun May 14 20:25:16 2023 +0800

    [IOTDB-5846] Optimize the memory estimate for HorizontallyConcatOperator (#9781)
---
 .../process/join/HorizontallyConcatOperator.java   |  8 +----
 .../mpp/execution/operator/OperatorMemoryTest.java | 34 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
index 1d3c8ce57b5..2bb083b9db4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.mpp.execution.operator.process.join;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.AbstractConsumeAllOperator;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -46,8 +45,6 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
   /** start index for each input TsBlocks and size of it is equal to inputTsBlocks */
   private final int[] inputIndex;
 
-  private final int outputColumnCount;
-
   private final TsBlockBuilder tsBlockBuilder;
 
   private boolean finished;
@@ -58,7 +55,6 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
     checkArgument(
         !children.isEmpty(), "child size of VerticallyConcatOperator should be larger than 0");
     this.inputIndex = new int[this.inputOperatorsCount];
-    this.outputColumnCount = dataTypes.size();
     this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
   }
 
@@ -143,9 +139,7 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
 
   @Override
   public long calculateMaxReturnSize() {
-    // time + all value columns
-    return (1L + outputColumnCount)
-        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    return children.stream().mapToLong(Operator::calculateMaxReturnSize).sum();
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index 2fcdff2e76a..bea10d48463 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregati
 import org.apache.iotdb.db.mpp.execution.operator.process.SortOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.HorizontallyConcatOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.AbstractUpdateLastCacheOperator;
@@ -466,6 +467,39 @@ public class OperatorMemoryTest {
     assertEquals(3 * 64 * 1024L, rowBasedTimeJoinOperator.calculateRetainedSizeAfterCallingNext());
   }
 
+  @Test
+  public void horizontallyConcatOperatorTest() {
+    long expectedMaxReturnSize = 0;
+    long expectedMaxPeekMemory = 0;
+    long childrenMaxPeekMemory = 0;
+
+    List<Operator> children = new ArrayList<>(4);
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      children.add(child);
+
+      expectedMaxReturnSize += child.calculateMaxReturnSize();
+      childrenMaxPeekMemory =
+          Math.max(childrenMaxPeekMemory, expectedMaxPeekMemory + child.calculateMaxPeekMemory());
+      expectedMaxPeekMemory += child.calculateMaxReturnSize();
+    }
+
+    expectedMaxPeekMemory =
+        Math.max(expectedMaxPeekMemory + expectedMaxReturnSize, childrenMaxPeekMemory);
+
+    HorizontallyConcatOperator horizontallyConcatOperator =
+        new HorizontallyConcatOperator(
+            Mockito.mock(OperatorContext.class), children, Collections.emptyList());
+
+    assertEquals(expectedMaxPeekMemory, horizontallyConcatOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, horizontallyConcatOperator.calculateMaxReturnSize());
+    assertEquals(
+        3 * 64 * 1024L, horizontallyConcatOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
   @Test
   public void sortOperatorTest() {
     Operator child = Mockito.mock(Operator.class);