You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/10 13:03:32 UTC
[iotdb] branch MemoryControl updated: implement calculateRetainedSizeAfterCallingNext in some operators
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/MemoryControl by this push:
new 30ff39724d implement calculateRetainedSizeAfterCallingNext in some operators
30ff39724d is described below
commit 30ff39724d7c607a54bd74cb20b6b69a31ebef2e
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Aug 10 21:03:23 2022 +0800
implement calculateRetainedSizeAfterCallingNext in some operators
---
.../iotdb/db/mpp/execution/operator/Operator.java | 7 +-
.../execution/operator/process/FillOperator.java | 8 ++-
.../execution/operator/process/LimitOperator.java | 5 ++
.../operator/process/LinearFillOperator.java | 8 ++-
.../execution/operator/process/OffsetOperator.java | 5 ++
.../execution/operator/process/SortOperator.java | 5 ++
.../process/join/RowBasedTimeJoinOperator.java | 22 +++++-
.../operator/process/join/TimeJoinOperator.java | 22 +++++-
.../process/last/LastQueryCollectOperator.java | 10 +++
.../process/last/LastQueryMergeOperator.java | 31 ++++++--
.../operator/process/last/LastQueryOperator.java | 14 +++-
.../process/last/LastQuerySortOperator.java | 19 +++--
.../process/last/UpdateLastCacheOperator.java | 5 ++
.../operator/source/AlignedSeriesScanOperator.java | 5 ++
.../operator/source/ExchangeOperator.java | 5 ++
.../operator/source/LastCacheScanOperator.java | 5 ++
.../operator/source/SeriesScanOperator.java | 5 ++
.../mpp/execution/operator/OperatorMemoryTest.java | 84 +++++++++++++++++-----
18 files changed, 221 insertions(+), 44 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
index 5ee15999b1..79ed840ebc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
@@ -59,7 +59,8 @@ public interface Operator extends AutoCloseable {
* be like: long estimatedOfCurrentOperator = XXXXX; return max(estimatedOfCurrentOperator,
* child1.calculateMaxPeekMemory(), child2.calculateMaxPeekMemory(), ....)
*
- * Each operator's MaxPeekMemory should also take retained size of each child operator into account.
+ * <p>Each operator's MaxPeekMemory should also take retained size of each child operator into
+ * account.
*
* @return estimated max memory footprint that the Operator Tree(rooted from this operator) will
* use while doing its own query processing
@@ -75,9 +76,9 @@ public interface Operator extends AutoCloseable {
}
// TODO remove the default while completing all the operators
-
/**
- * @return each operator's retained size after calling its next() method
+ * @return each operator's retained size(including all its children's retained size) after calling
+ * its next() method
*/
default long calculateRetainedSizeAfterCallingNext() {
return 0L;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index 94fa08fcd9..c182168bd3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -97,7 +97,13 @@ public class FillOperator implements ProcessOperator {
// while doing constant and previous fill, we may need to copy the corresponding column if there
// exists null values
// so the max peek memory may be double
- return 2 * child.calculateMaxPeekMemory();
+ return 2 * child.calculateMaxPeekMemory() + child.calculateRetainedSizeAfterCallingNext();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ // we can safely ignore one line cached in IFill
+ return child.calculateRetainedSizeAfterCallingNext();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index 4c5d608902..726f5e7ecf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -90,4 +90,9 @@ public class LimitOperator implements ProcessOperator {
public long calculateMaxReturnSize() {
return child.calculateMaxReturnSize();
}
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return child.calculateRetainedSizeAfterCallingNext();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index ee69557074..bcbb92a932 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -166,7 +166,7 @@ public class LinearFillOperator implements ProcessOperator {
// values, and we may also need to cache next TsBlock to get next not null value
// so the max peek memory may be triple or more, here we just use 3 as the estimated factor
// because in most cases, we will get next not null value in next TsBlock
- return 3 * child.calculateMaxPeekMemory();
+ return 3 * child.calculateMaxPeekMemory() + child.calculateRetainedSizeAfterCallingNext();
}
@Override
@@ -174,6 +174,12 @@ public class LinearFillOperator implements ProcessOperator {
return child.calculateMaxReturnSize();
}
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ // we can safely ignore two lines cached in LinearFill
+ return child.calculateRetainedSizeAfterCallingNext();
+ }
+
/**
* Judge whether we can use current cached TsBlock to fill Column
*
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index a329b25346..572738d081 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -89,4 +89,9 @@ public class OffsetOperator implements ProcessOperator {
public long calculateMaxReturnSize() {
return child.calculateMaxReturnSize();
}
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return child.calculateRetainedSizeAfterCallingNext();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
index a7cd8d046a..5fa693b2a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
@@ -64,4 +64,9 @@ public class SortOperator implements ProcessOperator {
public long calculateMaxReturnSize() {
return 0;
}
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 386f840d6c..695893d400 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -247,12 +247,16 @@ public class RowBasedTimeJoinOperator implements ProcessOperator {
@Override
public long calculateMaxPeekMemory() {
- long maxPeekMemory = calculateMaxReturnSize();
+ long maxPeekMemory = 0;
long childrenMaxPeekMemory = 0;
for (Operator child : children) {
- maxPeekMemory += child.calculateMaxReturnSize();
- childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ childrenMaxPeekMemory =
+ Math.max(childrenMaxPeekMemory, maxPeekMemory + child.calculateMaxPeekMemory());
+ maxPeekMemory +=
+ (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
}
+
+ maxPeekMemory += calculateMaxReturnSize();
return Math.max(maxPeekMemory, childrenMaxPeekMemory);
}
@@ -263,6 +267,18 @@ public class RowBasedTimeJoinOperator implements ProcessOperator {
* TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
}
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+ for (Operator child : children) {
+ long maxReturnSize = child.calculateMaxReturnSize();
+ currentRetainedSize += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+ minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+ }
+ // max cached TsBlock
+ return currentRetainedSize - minChildReturnSize;
+ }
+
private void updateTimeSelector(int index) {
timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index]));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
index bd5e005acd..3170838986 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
@@ -246,12 +246,16 @@ public class TimeJoinOperator implements ProcessOperator {
@Override
public long calculateMaxPeekMemory() {
- long maxPeekMemory = calculateMaxReturnSize();
+ long maxPeekMemory = 0;
long childrenMaxPeekMemory = 0;
for (Operator child : children) {
- maxPeekMemory += child.calculateMaxReturnSize();
- childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ childrenMaxPeekMemory =
+ Math.max(childrenMaxPeekMemory, maxPeekMemory + child.calculateMaxPeekMemory());
+ maxPeekMemory +=
+ (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
}
+
+ maxPeekMemory += calculateMaxReturnSize();
return Math.max(maxPeekMemory, childrenMaxPeekMemory);
}
@@ -262,6 +266,18 @@ public class TimeJoinOperator implements ProcessOperator {
* TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
}
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+ for (Operator child : children) {
+ long maxReturnSize = child.calculateMaxReturnSize();
+ currentRetainedSize += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+ minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+ }
+ // max cached TsBlock
+ return currentRetainedSize - minChildReturnSize;
+ }
+
/**
* If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
* return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
index f679bd1769..b91e346372 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
@@ -91,6 +91,7 @@ public class LastQueryCollectOperator implements ProcessOperator {
long maxPeekMemory = 0;
for (Operator child : children) {
maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+ maxPeekMemory = Math.max(maxPeekMemory, child.calculateRetainedSizeAfterCallingNext());
}
return maxPeekMemory;
}
@@ -103,4 +104,13 @@ public class LastQueryCollectOperator implements ProcessOperator {
}
return maxReturnMemory;
}
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long sum = 0;
+ for (Operator operator : children) {
+ sum += operator.calculateRetainedSizeAfterCallingNext();
+ }
+ return sum;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index af6e7b4bf7..5ad04bdc2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -225,16 +225,19 @@ public class LastQueryMergeOperator implements ProcessOperator {
@Override
public long calculateMaxPeekMemory() {
- // result size + cached TreeMap size
- long maxPeekMemory =
- calculateMaxReturnSize()
- + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
- * MAP_NODE_RETRAINED_SIZE;
+ long maxPeekMemory = 0;
long childrenMaxPeekMemory = 0;
for (Operator child : children) {
- maxPeekMemory += child.calculateMaxReturnSize();
- childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ childrenMaxPeekMemory =
+ Math.max(childrenMaxPeekMemory, maxPeekMemory + child.calculateMaxPeekMemory());
+ maxPeekMemory +=
+ (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
}
+ // result size + cached TreeMap size
+ maxPeekMemory +=
+ (calculateMaxReturnSize()
+ + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+ * MAP_NODE_RETRAINED_SIZE);
return Math.max(maxPeekMemory, childrenMaxPeekMemory);
}
@@ -247,6 +250,20 @@ public class LastQueryMergeOperator implements ProcessOperator {
return maxReturnSize;
}
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long childrenSum = 0, minChildReturnSize = Long.MAX_VALUE;
+ for (Operator child : children) {
+ long maxReturnSize = child.calculateMaxReturnSize();
+ childrenSum += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+ minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+ }
+ // max cached TsBlock + cached TreeMap size
+ return (childrenSum - minChildReturnSize)
+ + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+ * MAP_NODE_RETRAINED_SIZE;
+ }
+
/**
* If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
* return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index 40e434ef54..60620b1f56 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -146,14 +146,24 @@ public class LastQueryOperator implements ProcessOperator {
public long calculateMaxPeekMemory() {
long maxPeekMemory =
Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, tsBlockBuilder.getRetainedSizeInBytes());
+ long res = 0;
for (Operator child : children) {
- maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+ res = Math.max(res, maxPeekMemory + child.calculateMaxPeekMemory());
}
- return maxPeekMemory;
+ return res;
}
@Override
public long calculateMaxReturnSize() {
return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, tsBlockBuilder.getRetainedSizeInBytes());
}
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long sum = 0;
+ for (Operator operator : children) {
+ sum += operator.calculateRetainedSizeAfterCallingNext();
+ }
+ return sum;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index e40da67999..082173c29f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -194,13 +194,11 @@ public class LastQuerySortOperator implements ProcessOperator {
@Override
public long calculateMaxPeekMemory() {
long maxPeekMemory = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + cachedTsBlock.getRetainedSizeInBytes();
- long maxChildrenReturnSize = 0;
- long maxChildrenPeekMemory = 0;
+ long res = 0;
for (Operator child : children) {
- maxChildrenReturnSize = Math.max(maxChildrenReturnSize, child.calculateMaxReturnSize());
- maxChildrenPeekMemory = Math.max(maxChildrenPeekMemory, child.calculateMaxPeekMemory());
+ res = Math.max(res, maxPeekMemory + child.calculateMaxPeekMemory());
}
- return Math.max(maxPeekMemory + maxChildrenReturnSize, maxChildrenPeekMemory);
+ return res;
}
@Override
@@ -208,6 +206,17 @@ public class LastQuerySortOperator implements ProcessOperator {
return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
}
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long childrenMaxReturnSize = 0;
+ long childrenSumRetainedSize = 0;
+ for (Operator child : children) {
+ childrenMaxReturnSize = Math.max(childrenMaxReturnSize, child.calculateMaxReturnSize());
+ childrenSumRetainedSize += child.calculateRetainedSizeAfterCallingNext();
+ }
+ return cachedTsBlock.getRetainedSizeInBytes() + childrenMaxReturnSize + childrenSumRetainedSize;
+ }
+
private int getEndIndex() {
return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index afd543b8dc..44e423424e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -141,4 +141,9 @@ public class UpdateLastCacheOperator implements ProcessOperator {
public long calculateMaxReturnSize() {
return child.calculateMaxReturnSize();
}
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return child.calculateRetainedSizeAfterCallingNext();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
index 5b4f07cf24..c64947e4b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -133,6 +133,11 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
return maxReturnSize;
}
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
+ }
+
private boolean readChunkData() throws IOException {
while (seriesScanUtil.hasNextChunk()) {
if (readPageData()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index d9202de3f2..110079809b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -74,6 +74,11 @@ public class ExchangeOperator implements SourceOperator {
return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
}
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
+ }
+
@Override
public PlanNodeId getSourceId() {
return sourceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
index 6e5ff45cfa..974758f8a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
@@ -67,6 +67,11 @@ public class LastCacheScanOperator implements SourceOperator {
return tsBlock.getRetainedSizeInBytes();
}
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
+ }
+
@Override
public PlanNodeId getSourceId() {
return sourceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index 5a58594940..05685f758d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -134,6 +134,11 @@ public class SeriesScanOperator implements DataSourceOperator {
return maxReturnSize;
}
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
+ }
+
private boolean readChunkData() throws IOException {
while (seriesScanUtil.hasNextChunk()) {
if (readPageData()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index b57ee14dcf..349e98758b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -108,6 +108,8 @@ public class OperatorMemoryTest {
assertEquals(
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
seriesScanOperator.calculateMaxReturnSize());
+ assertEquals(0, seriesScanOperator.calculateRetainedSizeAfterCallingNext());
+
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
@@ -152,6 +154,8 @@ public class OperatorMemoryTest {
4 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
seriesScanOperator.calculateMaxReturnSize());
+ assertEquals(0, seriesScanOperator.calculateRetainedSizeAfterCallingNext());
+
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
@@ -166,6 +170,7 @@ public class OperatorMemoryTest {
assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, exchangeOperator.calculateMaxPeekMemory());
assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, exchangeOperator.calculateMaxReturnSize());
+ assertEquals(0, exchangeOperator.calculateRetainedSizeAfterCallingNext());
}
@Test
@@ -176,6 +181,7 @@ public class OperatorMemoryTest {
assertEquals(1024, lastCacheScanOperator.calculateMaxPeekMemory());
assertEquals(1024, lastCacheScanOperator.calculateMaxReturnSize());
+ assertEquals(0, lastCacheScanOperator.calculateRetainedSizeAfterCallingNext());
}
@Test
@@ -183,12 +189,14 @@ public class OperatorMemoryTest {
Operator child = Mockito.mock(Operator.class);
Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
FillOperator fillOperator =
new FillOperator(Mockito.mock(OperatorContext.class), new IFill[] {null, null}, child);
- assertEquals(2048 * 2, fillOperator.calculateMaxPeekMemory());
+ assertEquals(2048 * 2 + 512, fillOperator.calculateMaxPeekMemory());
assertEquals(1024, fillOperator.calculateMaxReturnSize());
+ assertEquals(512, fillOperator.calculateRetainedSizeAfterCallingNext());
}
@Test
@@ -203,6 +211,7 @@ public class OperatorMemoryTest {
long currentMaxReturnSize = random.nextInt(1024);
Mockito.when(child.calculateMaxPeekMemory()).thenReturn(currentMaxPeekMemory);
Mockito.when(child.calculateMaxReturnSize()).thenReturn(currentMaxReturnSize);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
children.add(child);
expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, currentMaxPeekMemory);
expectedMaxReturnSize = Math.max(expectedMaxReturnSize, currentMaxReturnSize);
@@ -212,6 +221,7 @@ public class OperatorMemoryTest {
assertEquals(expectedMaxPeekMemory, lastQueryCollectOperator.calculateMaxPeekMemory());
assertEquals(expectedMaxReturnSize, lastQueryCollectOperator.calculateMaxReturnSize());
+ assertEquals(4 * 512, lastQueryCollectOperator.calculateRetainedSizeAfterCallingNext());
}
@Test
@@ -219,18 +229,24 @@ public class OperatorMemoryTest {
List<Operator> children = new ArrayList<>(4);
Random random = new Random();
long expectedMaxPeekMemory = 0;
+ long temp = 0;
long expectedMaxReturnSize = 0;
long childSumReturnSize = 0;
+ long minReturnSize = Long.MAX_VALUE;
for (int i = 0; i < 4; i++) {
Operator child = Mockito.mock(Operator.class);
long currentMaxPeekMemory = random.nextInt(1024) + 1024;
long currentMaxReturnSize = random.nextInt(1024);
+ minReturnSize = Math.min(minReturnSize, currentMaxReturnSize);
childSumReturnSize += currentMaxReturnSize;
Mockito.when(child.calculateMaxPeekMemory()).thenReturn(currentMaxPeekMemory);
Mockito.when(child.calculateMaxReturnSize()).thenReturn(currentMaxReturnSize);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
children.add(child);
expectedMaxReturnSize = Math.max(expectedMaxReturnSize, currentMaxReturnSize);
- expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, currentMaxPeekMemory);
+ expectedMaxPeekMemory =
+ Math.max(expectedMaxPeekMemory, temp + child.calculateMaxPeekMemory());
+ temp += (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
}
// we need to cache all the TsBlocks of children and then return a new TsBlock as result whose
// max possible should be equal to max return size among all its children and then we should
@@ -238,7 +254,7 @@ public class OperatorMemoryTest {
expectedMaxPeekMemory =
Math.max(
expectedMaxPeekMemory,
- childSumReturnSize
+ temp
+ expectedMaxReturnSize
+ TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
* MAP_NODE_RETRAINED_SIZE);
@@ -249,6 +265,13 @@ public class OperatorMemoryTest {
assertEquals(expectedMaxPeekMemory, lastQueryMergeOperator.calculateMaxPeekMemory());
assertEquals(expectedMaxReturnSize, lastQueryMergeOperator.calculateMaxReturnSize());
+ assertEquals(
+ childSumReturnSize
+ - minReturnSize
+ + 4 * 512
+ + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+ * MAP_NODE_RETRAINED_SIZE,
+ lastQueryMergeOperator.calculateRetainedSizeAfterCallingNext());
}
@Test
@@ -256,25 +279,28 @@ public class OperatorMemoryTest {
TsBlockBuilder builder = Mockito.mock(TsBlockBuilder.class);
Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(1024L);
List<UpdateLastCacheOperator> children = new ArrayList<>(4);
- long expectedMaxPeekMemory = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
long expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
for (int i = 0; i < 4; i++) {
UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);
Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024 * 1024L);
Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
children.add(child);
- expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, 2 * 1024 * 1024L);
expectedMaxReturnSize = Math.max(expectedMaxReturnSize, 1024L);
}
LastQueryOperator lastQueryOperator =
new LastQueryOperator(Mockito.mock(OperatorContext.class), children, builder);
- assertEquals(expectedMaxPeekMemory, lastQueryOperator.calculateMaxPeekMemory());
+ assertEquals(
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + 2 * 1024 * 1024L,
+ lastQueryOperator.calculateMaxPeekMemory());
assertEquals(expectedMaxReturnSize, lastQueryOperator.calculateMaxReturnSize());
+ assertEquals(4 * 512L, lastQueryOperator.calculateRetainedSizeAfterCallingNext());
Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(4 * 1024 * 1024L);
- assertEquals(4 * 1024 * 1024L, lastQueryOperator.calculateMaxPeekMemory());
+ assertEquals(4 * 1024 * 1024L + 2 * 1024 * 1024L, lastQueryOperator.calculateMaxPeekMemory());
assertEquals(4 * 1024 * 1024L, lastQueryOperator.calculateMaxReturnSize());
+ assertEquals(4 * 512L, lastQueryOperator.calculateRetainedSizeAfterCallingNext());
}
@Test
@@ -283,23 +309,26 @@ public class OperatorMemoryTest {
Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(16 * 1024L);
Mockito.when(tsBlock.getPositionCount()).thenReturn(16);
List<UpdateLastCacheOperator> children = new ArrayList<>(4);
- long expectedMaxPeekMemory =
- DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + tsBlock.getRetainedSizeInBytes();
+
for (int i = 0; i < 4; i++) {
UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);
Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
children.add(child);
}
- expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory + 1024, 2 * 1024);
-
LastQuerySortOperator lastQuerySortOperator =
new LastQuerySortOperator(
Mockito.mock(OperatorContext.class), tsBlock, children, Comparator.naturalOrder());
- assertEquals(expectedMaxPeekMemory, lastQuerySortOperator.calculateMaxPeekMemory());
+ assertEquals(
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + tsBlock.getRetainedSizeInBytes() + 2 * 1024L,
+ lastQuerySortOperator.calculateMaxPeekMemory());
assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, lastQuerySortOperator.calculateMaxReturnSize());
+ assertEquals(
+ 16 * 1024L + 1024L + 4 * 512L,
+ lastQuerySortOperator.calculateRetainedSizeAfterCallingNext());
}
@Test
@@ -307,12 +336,14 @@ public class OperatorMemoryTest {
Operator child = Mockito.mock(Operator.class);
Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
LimitOperator limitOperator =
new LimitOperator(Mockito.mock(OperatorContext.class), 100, child);
assertEquals(2 * 1024L, limitOperator.calculateMaxPeekMemory());
assertEquals(1024, limitOperator.calculateMaxReturnSize());
+ assertEquals(512, limitOperator.calculateRetainedSizeAfterCallingNext());
}
@Test
@@ -320,12 +351,14 @@ public class OperatorMemoryTest {
Operator child = Mockito.mock(Operator.class);
Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
OffsetOperator offsetOperator =
new OffsetOperator(Mockito.mock(OperatorContext.class), 100, child);
assertEquals(2 * 1024L, offsetOperator.calculateMaxPeekMemory());
assertEquals(1024, offsetOperator.calculateMaxReturnSize());
+ assertEquals(512, offsetOperator.calculateRetainedSizeAfterCallingNext());
}
@Test
@@ -336,19 +369,22 @@ public class OperatorMemoryTest {
dataTypeList.add(TSDataType.INT32);
long expectedMaxReturnSize =
3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
- long expectedMaxPeekMemory = expectedMaxReturnSize;
+ long expectedMaxPeekMemory = 0;
long childrenMaxPeekMemory = 0;
for (int i = 0; i < 4; i++) {
Operator child = Mockito.mock(Operator.class);
Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+ childrenMaxPeekMemory =
+ Math.max(childrenMaxPeekMemory, expectedMaxPeekMemory + child.calculateMaxPeekMemory());
expectedMaxPeekMemory += 64 * 1024L;
- childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
children.add(child);
}
- expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+ expectedMaxPeekMemory =
+ Math.max(expectedMaxPeekMemory + expectedMaxReturnSize, childrenMaxPeekMemory);
RowBasedTimeJoinOperator rowBasedTimeJoinOperator =
new RowBasedTimeJoinOperator(
@@ -356,6 +392,7 @@ public class OperatorMemoryTest {
assertEquals(expectedMaxPeekMemory, rowBasedTimeJoinOperator.calculateMaxPeekMemory());
assertEquals(expectedMaxReturnSize, rowBasedTimeJoinOperator.calculateMaxReturnSize());
+ assertEquals(3 * 64 * 1024L, rowBasedTimeJoinOperator.calculateRetainedSizeAfterCallingNext());
}
@Test
@@ -363,6 +400,7 @@ public class OperatorMemoryTest {
SortOperator sortOperator = new SortOperator();
assertEquals(0, sortOperator.calculateMaxPeekMemory());
assertEquals(0, sortOperator.calculateMaxReturnSize());
+ assertEquals(0, sortOperator.calculateRetainedSizeAfterCallingNext());
}
@Test
@@ -373,19 +411,22 @@ public class OperatorMemoryTest {
dataTypeList.add(TSDataType.INT32);
long expectedMaxReturnSize =
3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
- long expectedMaxPeekMemory = expectedMaxReturnSize;
+ long expectedMaxPeekMemory = 0;
long childrenMaxPeekMemory = 0;
for (int i = 0; i < 4; i++) {
Operator child = Mockito.mock(Operator.class);
Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+ childrenMaxPeekMemory =
+ Math.max(childrenMaxPeekMemory, expectedMaxPeekMemory + child.calculateMaxPeekMemory());
expectedMaxPeekMemory += 64 * 1024L;
- childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
children.add(child);
}
- expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+ expectedMaxPeekMemory =
+ Math.max(expectedMaxPeekMemory + expectedMaxReturnSize, childrenMaxPeekMemory);
TimeJoinOperator timeJoinOperator =
new TimeJoinOperator(
@@ -393,6 +434,7 @@ public class OperatorMemoryTest {
assertEquals(expectedMaxPeekMemory, timeJoinOperator.calculateMaxPeekMemory());
assertEquals(expectedMaxReturnSize, timeJoinOperator.calculateMaxReturnSize());
+ assertEquals(3 * 64 * 1024L, timeJoinOperator.calculateRetainedSizeAfterCallingNext());
}
@Test
@@ -400,12 +442,14 @@ public class OperatorMemoryTest {
Operator child = Mockito.mock(Operator.class);
Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
UpdateLastCacheOperator updateLastCacheOperator =
new UpdateLastCacheOperator(null, child, null, TSDataType.BOOLEAN, null, true);
assertEquals(2048, updateLastCacheOperator.calculateMaxPeekMemory());
assertEquals(1024, updateLastCacheOperator.calculateMaxReturnSize());
+ assertEquals(512, updateLastCacheOperator.calculateRetainedSizeAfterCallingNext());
}
@Test
@@ -413,12 +457,14 @@ public class OperatorMemoryTest {
Operator child = Mockito.mock(Operator.class);
Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
LinearFillOperator linearFillOperator =
new LinearFillOperator(
Mockito.mock(OperatorContext.class), new LinearFill[] {null, null}, child);
- assertEquals(2048 * 3, linearFillOperator.calculateMaxPeekMemory());
+ assertEquals(2048 * 3 + 512L, linearFillOperator.calculateMaxPeekMemory());
assertEquals(1024, linearFillOperator.calculateMaxReturnSize());
+ assertEquals(512, linearFillOperator.calculateRetainedSizeAfterCallingNext());
}
}