You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/08/14 16:16:14 UTC

[iotdb] 01/02: finish memory calculate for SingleInputAggregationOperator

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c1691b42055fe6429821d10662aa215826e66c39
Author: liuminghui233 <54...@qq.com>
AuthorDate: Sun Aug 14 23:58:32 2022 +0800

    finish memory calculate for SingleInputAggregationOperator
---
 .../process/RawDataAggregationOperator.java        | 23 +++---------
 .../process/SingleInputAggregationOperator.java    | 31 ++++++++++++----
 .../process/SlidingWindowAggregationOperator.java  | 17 +++------
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 41 ++++++++++++++++++++--
 .../operator/RawDataAggregationOperatorTest.java   |  5 ++-
 .../SlidingWindowAggregationOperatorTest.java      |  4 ++-
 6 files changed, 78 insertions(+), 43 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index c88345c901..d95968850d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -20,17 +20,15 @@
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.List;
 
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
-import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 /**
  * RawDataAggregationOperator is used to process raw data tsBlock input calculating using value
@@ -43,13 +41,15 @@ import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEF
  * <p>Return aggregation result in many time intervals once.
  */
 public class RawDataAggregationOperator extends SingleInputAggregationOperator {
+
   public RawDataAggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       Operator child,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter) {
-    super(operatorContext, aggregators, child, ascending, groupByTimeParameter, true);
+      long maxReturnSize) {
+    super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize);
   }
 
   @Override
@@ -81,17 +81,4 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
     inputTsBlock = calcResult.getRight();
     return calcResult.getLeft();
   }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return calculateMaxReturnSize() + child.calculateMaxReturnSize();
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    // time + all value columns
-    return (1L + inputTsBlock.getValueColumnCount())
-            * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
-        + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index c59de13988..1d746f1150 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -37,7 +36,6 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 
 public abstract class SingleInputAggregationOperator implements ProcessOperator {
 
@@ -57,26 +55,30 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
   // using for building result tsBlock
   protected final TsBlockBuilder resultTsBlockBuilder;
 
+  protected final long maxRetainedSize;
+  protected final long maxReturnSize;
+
   public SingleInputAggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
       Operator child,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter,
-      boolean outputPartialTimeWindow) {
+      ITimeRangeIterator timeRangeIterator,
+      long maxReturnSize) {
     this.operatorContext = operatorContext;
     this.ascending = ascending;
     this.child = child;
     this.aggregators = aggregators;
-
-    this.timeRangeIterator =
-        initTimeRangeIterator(groupByTimeParameter, ascending, outputPartialTimeWindow);
+    this.timeRangeIterator = timeRangeIterator;
 
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+    this.maxRetainedSize = child.calculateMaxReturnSize();
+    this.maxReturnSize = maxReturnSize;
   }
 
   @Override
@@ -146,4 +148,19 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
     curTimeRange = null;
     appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator);
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxReturnSize + maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 9f1050772c..d73491d3e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -30,7 +30,6 @@ import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
-import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class SlidingWindowAggregationOperator extends SingleInputAggregationOperator {
 
@@ -41,10 +40,12 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
   public SlidingWindowAggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator,
       Operator child,
       boolean ascending,
-      GroupByTimeParameter groupByTimeParameter) {
-    super(operatorContext, aggregators, child, ascending, groupByTimeParameter, false);
+      GroupByTimeParameter groupByTimeParameter,
+      long maxReturnSize) {
+    super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize);
     checkArgument(
         groupByTimeParameter != null,
         "GroupByTimeParameter cannot be null in SlidingWindowAggregationOperator");
@@ -105,14 +106,4 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
     }
     curSubTimeRange = null;
   }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return calculateMaxReturnSize() + child.calculateMaxReturnSize();
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index cfb2b42a57..50d65c2bbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -188,6 +188,7 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSize;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSizeForLastQuery;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
@@ -1088,7 +1089,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     boolean ascending = node.getScanOrder() == Ordering.ASC;
     List<Aggregator> aggregators = new ArrayList<>();
     Map<String, List<InputLocation>> layout = makeLayout(node);
-    for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) {
+    List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
+    for (AggregationDescriptor descriptor : aggregationDescriptors) {
       List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
       aggregators.add(
           SlidingWindowAggregatorFactory.createSlidingWindowAggregator(
@@ -1102,9 +1104,25 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
               descriptor.getStep()));
     }
 
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        initTimeRangeIterator(groupByTimeParameter, ascending, false);
+    long maxReturnSize =
+        calculateMaxAggregationResultSize(
+            aggregationDescriptors,
+            timeRangeIterator,
+            groupByTimeParameter != null,
+            context.getTypeProvider());
+
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
     return new SlidingWindowAggregationOperator(
-        operatorContext, aggregators, child, ascending, node.getGroupByTimeParameter());
+        operatorContext,
+        aggregators,
+        timeRangeIterator,
+        child,
+        ascending,
+        groupByTimeParameter,
+        maxReturnSize);
   }
 
   @Override
@@ -1149,6 +1167,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     boolean ascending = node.getScanOrder() == Ordering.ASC;
     List<Aggregator> aggregators = new ArrayList<>();
     Map<String, List<InputLocation>> layout = makeLayout(node);
+    List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
     for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) {
       List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
       aggregators.add(
@@ -1174,8 +1193,24 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                   node.getPlanNodeId(),
                   RawDataAggregationOperator.class.getSimpleName());
       context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
+
+      GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+      ITimeRangeIterator timeRangeIterator =
+          initTimeRangeIterator(groupByTimeParameter, ascending, true);
+      long maxReturnSize =
+          calculateMaxAggregationResultSize(
+              aggregationDescriptors,
+              timeRangeIterator,
+              groupByTimeParameter != null,
+              context.getTypeProvider());
+
       return new RawDataAggregationOperator(
-          operatorContext, aggregators, children.get(0), ascending, node.getGroupByTimeParameter());
+          operatorContext,
+          aggregators,
+          timeRangeIterator,
+          children.get(0),
+          ascending,
+          maxReturnSize);
     } else {
       OperatorContext operatorContext =
           context
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index 6a9902f24b..f25b0d2fcd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -64,6 +64,8 @@ import java.util.concurrent.ExecutorService;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
 
 public class RawDataAggregationOperatorTest {
@@ -381,8 +383,9 @@ public class RawDataAggregationOperatorTest {
     return new RawDataAggregationOperator(
         fragmentInstanceContext.getOperatorContexts().get(3),
         aggregators,
+        initTimeRangeIterator(groupByTimeParameter, true, true),
         timeJoinOperator,
         true,
-        groupByTimeParameter);
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
index 7eebdedbcc..a0ed242e63 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
@@ -258,8 +258,10 @@ public class SlidingWindowAggregationOperatorTest {
     return new SlidingWindowAggregationOperator(
         fragmentInstanceContext.getOperatorContexts().get(1),
         finalAggregators,
+        initTimeRangeIterator(groupByTimeParameter, ascending, false),
         seriesAggregationScanOperator,
         ascending,
-        groupByTimeParameter);
+        groupByTimeParameter,
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
   }
 }