You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/05/14 12:40:58 UTC

[iotdb] branch lmh/OptConcatOpMem1.1 created (now fc4f4e6d5d9)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch lmh/OptConcatOpMem1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at fc4f4e6d5d9 [IOTDB-5846] Optimize the memory estimate for HorizontallyConcatOperator (#9781)

This branch includes the following new commits:

     new fc4f4e6d5d9 [IOTDB-5846] Optimize the memory estimate for HorizontallyConcatOperator (#9781)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-5846] Optimize the memory estimate for HorizontallyConcatOperator (#9781)

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/OptConcatOpMem1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fc4f4e6d5d9e6c41a4b54fb6f414e44bf0c67960
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Sun May 14 20:25:16 2023 +0800

    [IOTDB-5846] Optimize the memory estimate for HorizontallyConcatOperator (#9781)
    
    (cherry picked from commit 10d4eba19434513caa44e8e42c3fc1728dda85a1)
---
 .../process/join/HorizontallyConcatOperator.java   |  8 +----
 .../mpp/execution/operator/OperatorMemoryTest.java | 34 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
index f26fb7dfb6d..88ed9075263 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.mpp.execution.operator.process.join;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.AbstractConsumeAllOperator;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -46,8 +45,6 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
   /** start index for each input TsBlocks and size of it is equal to inputTsBlocks */
   private final int[] inputIndex;
 
-  private final int outputColumnCount;
-
   private final TsBlockBuilder tsBlockBuilder;
 
   private boolean finished;
@@ -58,7 +55,6 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
     checkArgument(
         !children.isEmpty(), "child size of VerticallyConcatOperator should be larger than 0");
     this.inputIndex = new int[this.inputOperatorsCount];
-    this.outputColumnCount = dataTypes.size();
     this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
   }
 
@@ -145,9 +141,7 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
 
   @Override
   public long calculateMaxReturnSize() {
-    // time + all value columns
-    return (1L + outputColumnCount)
-        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    return children.stream().mapToLong(Operator::calculateMaxReturnSize).sum();
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index ec3fa4a9b98..32271512dc1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -46,6 +46,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregati
 import org.apache.iotdb.db.mpp.execution.operator.process.SortOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.HorizontallyConcatOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.AbstractUpdateLastCacheOperator;
@@ -465,6 +466,39 @@ public class OperatorMemoryTest {
     assertEquals(3 * 64 * 1024L, rowBasedTimeJoinOperator.calculateRetainedSizeAfterCallingNext());
   }
 
+  @Test
+  public void horizontallyConcatOperatorTest() {
+    long expectedMaxReturnSize = 0;
+    long expectedMaxPeekMemory = 0;
+    long childrenMaxPeekMemory = 0;
+
+    List<Operator> children = new ArrayList<>(4);
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      children.add(child);
+
+      expectedMaxReturnSize += child.calculateMaxReturnSize();
+      childrenMaxPeekMemory =
+          Math.max(childrenMaxPeekMemory, expectedMaxPeekMemory + child.calculateMaxPeekMemory());
+      expectedMaxPeekMemory += child.calculateMaxReturnSize();
+    }
+
+    expectedMaxPeekMemory =
+        Math.max(expectedMaxPeekMemory + expectedMaxReturnSize, childrenMaxPeekMemory);
+
+    HorizontallyConcatOperator horizontallyConcatOperator =
+        new HorizontallyConcatOperator(
+            Mockito.mock(OperatorContext.class), children, Collections.emptyList());
+
+    assertEquals(expectedMaxPeekMemory, horizontallyConcatOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, horizontallyConcatOperator.calculateMaxReturnSize());
+    assertEquals(
+        3 * 64 * 1024L, horizontallyConcatOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
   @Test
   public void sortOperatorTest() {
     Operator child = Mockito.mock(Operator.class);