You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/05/07 15:05:00 UTC

[iotdb] branch lmh/OptConcatOpMem created (now 433a8d478a3)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch lmh/OptConcatOpMem
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 433a8d478a3 opt & add test

This branch includes the following new commits:

     new 433a8d478a3 opt & add test

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: opt & add test

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/OptConcatOpMem
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 433a8d478a34e3d8711a6aa2aab4044b17a995b2
Author: liuminghui233 <54...@qq.com>
AuthorDate: Sun May 7 23:04:40 2023 +0800

    opt & add test
---
 .../process/join/HorizontallyConcatOperator.java   |  5 +---
 .../mpp/execution/operator/OperatorMemoryTest.java | 34 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
index 1d3c8ce57b5..94b00ce6c31 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.mpp.execution.operator.process.join;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.AbstractConsumeAllOperator;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -143,9 +142,7 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
 
   @Override
   public long calculateMaxReturnSize() {
-    // time + all value columns
-    return (1L + outputColumnCount)
-        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    return children.stream().mapToLong(Operator::calculateMaxReturnSize).sum();
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index ec3fa4a9b98..32271512dc1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -46,6 +46,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregati
 import org.apache.iotdb.db.mpp.execution.operator.process.SortOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.HorizontallyConcatOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.AbstractUpdateLastCacheOperator;
@@ -465,6 +466,39 @@ public class OperatorMemoryTest {
     assertEquals(3 * 64 * 1024L, rowBasedTimeJoinOperator.calculateRetainedSizeAfterCallingNext());
   }
 
+  @Test
+  public void horizontallyConcatOperatorTest() {
+    long expectedMaxReturnSize = 0;
+    long expectedMaxPeekMemory = 0;
+    long childrenMaxPeekMemory = 0;
+
+    List<Operator> children = new ArrayList<>(4);
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      children.add(child);
+
+      expectedMaxReturnSize += child.calculateMaxReturnSize();
+      childrenMaxPeekMemory =
+          Math.max(childrenMaxPeekMemory, expectedMaxPeekMemory + child.calculateMaxPeekMemory());
+      expectedMaxPeekMemory += child.calculateMaxReturnSize();
+    }
+
+    expectedMaxPeekMemory =
+        Math.max(expectedMaxPeekMemory + expectedMaxReturnSize, childrenMaxPeekMemory);
+
+    HorizontallyConcatOperator horizontallyConcatOperator =
+        new HorizontallyConcatOperator(
+            Mockito.mock(OperatorContext.class), children, Collections.emptyList());
+
+    assertEquals(expectedMaxPeekMemory, horizontallyConcatOperator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, horizontallyConcatOperator.calculateMaxReturnSize());
+    assertEquals(
+        3 * 64 * 1024L, horizontallyConcatOperator.calculateRetainedSizeAfterCallingNext());
+  }
+
   @Test
   public void sortOperatorTest() {
     Operator child = Mockito.mock(Operator.class);