You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/08/14 16:16:14 UTC
[iotdb] 01/02: finish memory calculate for SingleInputAggregationOperator
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c1691b42055fe6429821d10662aa215826e66c39
Author: liuminghui233 <54...@qq.com>
AuthorDate: Sun Aug 14 23:58:32 2022 +0800
finish memory calculate for SingleInputAggregationOperator
---
.../process/RawDataAggregationOperator.java | 23 +++---------
.../process/SingleInputAggregationOperator.java | 31 ++++++++++++----
.../process/SlidingWindowAggregationOperator.java | 17 +++------
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 41 ++++++++++++++++++++--
.../operator/RawDataAggregationOperatorTest.java | 5 ++-
.../SlidingWindowAggregationOperatorTest.java | 4 ++-
6 files changed, 78 insertions(+), 43 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index c88345c901..d95968850d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -20,17 +20,15 @@
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.utils.Pair;
import java.util.List;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
-import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
/**
* RawDataAggregationOperator is used to process raw data tsBlock input calculating using value
@@ -43,13 +41,15 @@ import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEF
* <p>Return aggregation result in many time intervals once.
*/
public class RawDataAggregationOperator extends SingleInputAggregationOperator {
+
public RawDataAggregationOperator(
OperatorContext operatorContext,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
Operator child,
boolean ascending,
- GroupByTimeParameter groupByTimeParameter) {
- super(operatorContext, aggregators, child, ascending, groupByTimeParameter, true);
+ long maxReturnSize) {
+ super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize);
}
@Override
@@ -81,17 +81,4 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
inputTsBlock = calcResult.getRight();
return calcResult.getLeft();
}
-
- @Override
- public long calculateMaxPeekMemory() {
- return calculateMaxReturnSize() + child.calculateMaxReturnSize();
- }
-
- @Override
- public long calculateMaxReturnSize() {
- // time + all value columns
- return (1L + inputTsBlock.getValueColumnCount())
- * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
- + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index c59de13988..1d746f1150 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -37,7 +36,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
public abstract class SingleInputAggregationOperator implements ProcessOperator {
@@ -57,26 +55,30 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
// using for building result tsBlock
protected final TsBlockBuilder resultTsBlockBuilder;
+ protected final long maxRetainedSize;
+ protected final long maxReturnSize;
+
public SingleInputAggregationOperator(
OperatorContext operatorContext,
List<Aggregator> aggregators,
Operator child,
boolean ascending,
- GroupByTimeParameter groupByTimeParameter,
- boolean outputPartialTimeWindow) {
+ ITimeRangeIterator timeRangeIterator,
+ long maxReturnSize) {
this.operatorContext = operatorContext;
this.ascending = ascending;
this.child = child;
this.aggregators = aggregators;
-
- this.timeRangeIterator =
- initTimeRangeIterator(groupByTimeParameter, ascending, outputPartialTimeWindow);
+ this.timeRangeIterator = timeRangeIterator;
List<TSDataType> dataTypes = new ArrayList<>();
for (Aggregator aggregator : aggregators) {
dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+ this.maxRetainedSize = child.calculateMaxReturnSize();
+ this.maxReturnSize = maxReturnSize;
}
@Override
@@ -146,4 +148,19 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
curTimeRange = null;
appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator);
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return maxReturnSize + maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 9f1050772c..d73491d3e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -30,7 +30,6 @@ import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
-import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
public class SlidingWindowAggregationOperator extends SingleInputAggregationOperator {
@@ -41,10 +40,12 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
public SlidingWindowAggregationOperator(
OperatorContext operatorContext,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
Operator child,
boolean ascending,
- GroupByTimeParameter groupByTimeParameter) {
- super(operatorContext, aggregators, child, ascending, groupByTimeParameter, false);
+ GroupByTimeParameter groupByTimeParameter,
+ long maxReturnSize) {
+ super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize);
checkArgument(
groupByTimeParameter != null,
"GroupByTimeParameter cannot be null in SlidingWindowAggregationOperator");
@@ -105,14 +106,4 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
}
curSubTimeRange = null;
}
-
- @Override
- public long calculateMaxPeekMemory() {
- return calculateMaxReturnSize() + child.calculateMaxReturnSize();
- }
-
- @Override
- public long calculateMaxReturnSize() {
- return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index cfb2b42a57..50d65c2bbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -188,6 +188,7 @@ import java.util.Objects;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSize;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSizeForLastQuery;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
@@ -1088,7 +1089,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
boolean ascending = node.getScanOrder() == Ordering.ASC;
List<Aggregator> aggregators = new ArrayList<>();
Map<String, List<InputLocation>> layout = makeLayout(node);
- for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) {
+ List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
+ for (AggregationDescriptor descriptor : aggregationDescriptors) {
List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
aggregators.add(
SlidingWindowAggregatorFactory.createSlidingWindowAggregator(
@@ -1102,9 +1104,25 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
descriptor.getStep()));
}
+ GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, false);
+ long maxReturnSize =
+ calculateMaxAggregationResultSize(
+ aggregationDescriptors,
+ timeRangeIterator,
+ groupByTimeParameter != null,
+ context.getTypeProvider());
+
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
return new SlidingWindowAggregationOperator(
- operatorContext, aggregators, child, ascending, node.getGroupByTimeParameter());
+ operatorContext,
+ aggregators,
+ timeRangeIterator,
+ child,
+ ascending,
+ groupByTimeParameter,
+ maxReturnSize);
}
@Override
@@ -1149,6 +1167,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
boolean ascending = node.getScanOrder() == Ordering.ASC;
List<Aggregator> aggregators = new ArrayList<>();
Map<String, List<InputLocation>> layout = makeLayout(node);
+ List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) {
List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
aggregators.add(
@@ -1174,8 +1193,24 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
node.getPlanNodeId(),
RawDataAggregationOperator.class.getSimpleName());
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
+
+ GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, true);
+ long maxReturnSize =
+ calculateMaxAggregationResultSize(
+ aggregationDescriptors,
+ timeRangeIterator,
+ groupByTimeParameter != null,
+ context.getTypeProvider());
+
return new RawDataAggregationOperator(
- operatorContext, aggregators, children.get(0), ascending, node.getGroupByTimeParameter());
+ operatorContext,
+ aggregators,
+ timeRangeIterator,
+ children.get(0),
+ ascending,
+ maxReturnSize);
} else {
OperatorContext operatorContext =
context
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index 6a9902f24b..f25b0d2fcd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -64,6 +64,8 @@ import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
public class RawDataAggregationOperatorTest {
@@ -381,8 +383,9 @@ public class RawDataAggregationOperatorTest {
return new RawDataAggregationOperator(
fragmentInstanceContext.getOperatorContexts().get(3),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, true, true),
timeJoinOperator,
true,
- groupByTimeParameter);
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
index 7eebdedbcc..a0ed242e63 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
@@ -258,8 +258,10 @@ public class SlidingWindowAggregationOperatorTest {
return new SlidingWindowAggregationOperator(
fragmentInstanceContext.getOperatorContexts().get(1),
finalAggregators,
+ initTimeRangeIterator(groupByTimeParameter, ascending, false),
seriesAggregationScanOperator,
ascending,
- groupByTimeParameter);
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
}
}