You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/08 12:56:15 UTC
[iotdb] 01/01: Add memory control for some operators
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 93cea7db3125ccb0c865f9433856158f3f883e2c
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Aug 8 20:55:30 2022 +0800
Add memory control for some operators
---
.../resources/conf/iotdb-datanode.properties | 6 +--
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 46 ++++++++++++++++------
.../mpp/execution/memory/LocalMemoryManager.java | 8 ++--
.../iotdb/db/mpp/execution/operator/Operator.java | 19 +++++++++
.../execution/operator/process/FillOperator.java | 13 ++++++
.../execution/operator/process/LimitOperator.java | 10 +++++
.../operator/process/LinearFillOperator.java | 14 +++++++
.../execution/operator/process/OffsetOperator.java | 10 +++++
.../execution/operator/process/SortOperator.java | 10 +++++
.../process/join/RowBasedTimeJoinOperator.java | 19 +++++++++
.../operator/process/join/TimeJoinOperator.java | 19 +++++++++
.../process/last/LastQueryCollectOperator.java | 18 +++++++++
.../process/last/LastQueryMergeOperator.java | 30 ++++++++++++++
.../operator/process/last/LastQueryOperator.java | 16 ++++++++
.../process/last/LastQuerySortOperator.java | 17 ++++++++
.../process/last/UpdateLastCacheOperator.java | 10 +++++
.../operator/source/AlignedSeriesScanOperator.java | 21 +++++++++-
.../operator/source/ExchangeOperator.java | 12 ++++++
.../operator/source/LastCacheScanOperator.java | 10 +++++
.../operator/source/SeriesScanOperator.java | 18 ++++++++-
.../execution/operator/source/SeriesScanUtil.java | 4 +-
21 files changed, 307 insertions(+), 23 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 57448902cd..35a0f89d14 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -637,9 +637,9 @@ timestamp_precision=ms
# Datatype: boolean
# meta_data_cache_enable=true
-# Read memory Allocation Ratio: BloomFilterCache, ChunkCache, TimeSeriesMetadataCache, memory used for constructing QueryDataSet and Free Memory Used in Query.
-# The parameter form is a:b:c:d:e, where a, b, c, d and e are integers. for example: 1:1:1:1:1 , 1:100:200:300:400
-# chunk_timeseriesmeta_free_memory_proportion=1:100:200:300:400
+# Read memory Allocation Ratio: BloomFilterCache : ChunkCache : TimeSeriesMetadataCache : Coordinator : Operators : DataExchange.
+# The parameter form is a:b:c:d:e:f, where a, b, c, d, e and f are integers. for example: 1:1:1:1:1:1 , 1:100:200:50:300:350
+# chunk_timeseriesmeta_free_memory_proportion=1:100:200:50:300:350
####################
### LAST Cache Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index cb2583e4d9..17234c37be 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -130,10 +130,7 @@ public class IoTDBConfig {
private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10;
/** Memory allocated for the mtree */
- private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() * 1 / 10;
-
- /** Memory allocated for the read process besides cache */
- private long allocateMemoryForReadWithoutCache = allocateMemoryForRead * 300 / 1001;
+ private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() / 10;
private volatile int maxQueryDeduplicatedPathNum = 1000;
@@ -474,6 +471,15 @@ public class IoTDBConfig {
/** Memory allocated for chunk cache in read process */
private long allocateMemoryForChunkCache = allocateMemoryForRead * 100 / 1001;
+ /** Memory allocated for operators */
+ private long allocateMemoryForCoordinator = allocateMemoryForRead * 50 / 1001;
+
+ /** Memory allocated for operators */
+ private long allocateMemoryForOperators = allocateMemoryForRead * 300 / 1001;
+
+ /** Memory allocated for operators */
+ private long allocateMemoryForDataExchange = allocateMemoryForRead * 350 / 1001;
+
/** Whether to enable Last cache */
private boolean lastCacheEnable = true;
@@ -1715,14 +1721,6 @@ public class IoTDBConfig {
this.allocateMemoryForRead = allocateMemoryForRead;
}
- public long getAllocateMemoryForReadWithoutCache() {
- return allocateMemoryForReadWithoutCache;
- }
-
- public void setAllocateMemoryForReadWithoutCache(long allocateMemoryForReadWithoutCache) {
- this.allocateMemoryForReadWithoutCache = allocateMemoryForReadWithoutCache;
- }
-
public boolean isEnableExternalSort() {
return enableExternalSort;
}
@@ -1934,6 +1932,30 @@ public class IoTDBConfig {
this.allocateMemoryForChunkCache = allocateMemoryForChunkCache;
}
+ public long getAllocateMemoryForCoordinator() {
+ return allocateMemoryForCoordinator;
+ }
+
+ public void setAllocateMemoryForCoordinator(long allocateMemoryForCoordinator) {
+ this.allocateMemoryForCoordinator = allocateMemoryForCoordinator;
+ }
+
+ public long getAllocateMemoryForOperators() {
+ return allocateMemoryForOperators;
+ }
+
+ public void setAllocateMemoryForOperators(long allocateMemoryForOperators) {
+ this.allocateMemoryForOperators = allocateMemoryForOperators;
+ }
+
+ public long getAllocateMemoryForDataExchange() {
+ return allocateMemoryForDataExchange;
+ }
+
+ public void setAllocateMemoryForDataExchange(long allocateMemoryForDataExchange) {
+ this.allocateMemoryForDataExchange = allocateMemoryForDataExchange;
+ }
+
public boolean isLastCacheEnabled() {
return lastCacheEnable;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
index d0eff60394..5c6f3c5659 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
@@ -30,11 +30,9 @@ public class LocalMemoryManager {
private final MemoryPool queryPool;
public LocalMemoryManager() {
- queryPool =
- new MemoryPool(
- "query",
- IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead(),
- (long) (IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead() * 0.5));
+ long totalMemory = IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForDataExchange();
+ int maxQueryThread = IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread();
+ queryPool = new MemoryPool("query", totalMemory, totalMemory / maxQueryThread);
}
public MemoryPool getQueryPool() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
index dfa08e033f..7d8765eaa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
@@ -52,4 +52,23 @@ public interface Operator extends AutoCloseable {
* Is this operator completely finished processing and no more output TsBlock will be produced.
*/
boolean isFinished();
+
+ // TODO remove the default while completing all the operators
+ /**
+ * We should also consider the memory used by its children operator, so the calculation logic may
+ * be like: long estimatedOfCurrentOperator = XXXXX; return max(estimatedOfCurrentOperator,
+ * child1.calculateMaxPeekMemory(), child2.calculateMaxPeekMemory(), ....)
+ *
+ * @return estimated max memory footprint that the Operator Tree(rooted from this operator) will
+ * use while doing its own query processing
+ */
+ default long calculateMaxPeekMemory() {
+ return 0L;
+ }
+
+ // TODO remove the default while completing all the operators
+ /** @return estimated max memory footprint for returned TsBlock when calling operator.next() */
+ default long calculateMaxReturnSize() {
+ return 0L;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index da7ffe6ab1..94fa08fcd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -91,4 +91,17 @@ public class FillOperator implements ProcessOperator {
public boolean isFinished() {
return child.isFinished();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ // while doing constant and previous fill, we may need to copy the corresponding column if there
+ // exists null values
+ // so the max peek memory may be double
+ return 2 * child.calculateMaxPeekMemory();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return child.calculateMaxReturnSize();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index ef3870fb17..4c5d608902 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -80,4 +80,14 @@ public class LimitOperator implements ProcessOperator {
public boolean isFinished() {
return remainingLimit == 0 || child.isFinished();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return child.calculateMaxPeekMemory();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return child.calculateMaxReturnSize();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index fcae23ce7c..ee69557074 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -160,6 +160,20 @@ public class LinearFillOperator implements ProcessOperator {
return cachedTsBlock.isEmpty() && child.isFinished();
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ // while doing linear fill, we may need to copy the corresponding column if there exists null
+ // values, and we may also need to cache next TsBlock to get next not null value
+ // so the max peek memory may be triple or more, here we just use 3 as the estimated factor
+ // because in most cases, we will get next not null value in next TsBlock
+ return 3 * child.calculateMaxPeekMemory();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return child.calculateMaxReturnSize();
+ }
+
/**
* Judge whether we can use current cached TsBlock to fill Column
*
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index 2820992745..a329b25346 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -79,4 +79,14 @@ public class OffsetOperator implements ProcessOperator {
public boolean isFinished() {
return child.isFinished();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return child.calculateMaxPeekMemory();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return child.calculateMaxReturnSize();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
index 38bda24bf7..a7cd8d046a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
@@ -54,4 +54,14 @@ public class SortOperator implements ProcessOperator {
public boolean isFinished() {
return false;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index f026ef35c4..386f840d6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerge
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -244,6 +245,24 @@ public class RowBasedTimeJoinOperator implements ProcessOperator {
return finished;
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = calculateMaxReturnSize();
+ long childrenMaxPeekMemory = 0;
+ for (Operator child : children) {
+ maxPeekMemory += child.calculateMaxReturnSize();
+ childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ }
+ return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ // time + all value columns
+ return (1L + outputColumnCount)
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ }
+
private void updateTimeSelector(int index) {
timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index]));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
index eb185e09ff..bd5e005acd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerge
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -243,6 +244,24 @@ public class TimeJoinOperator implements ProcessOperator {
return finished;
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = calculateMaxReturnSize();
+ long childrenMaxPeekMemory = 0;
+ for (Operator child : children) {
+ maxPeekMemory += child.calculateMaxReturnSize();
+ childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ }
+ return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ // time + all value columns
+ return (1L + outputColumnCount)
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ }
+
/**
* If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
* return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
index 7f011c32cf..f679bd1769 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
@@ -85,4 +85,22 @@ public class LastQueryCollectOperator implements ProcessOperator {
public boolean isFinished() {
return !hasNext();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = 0;
+ for (Operator child : children) {
+ maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+ }
+ return maxPeekMemory;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ long maxReturnMemory = 0;
+ for (Operator child : children) {
+ maxReturnMemory = Math.max(maxReturnMemory, child.calculateMaxReturnSize());
+ }
+ return maxReturnMemory;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index 99deebfd59..9d8be1e23d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.db.mpp.execution.operator.process.last;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
import com.google.common.util.concurrent.ListenableFuture;
+import org.openjdk.jol.info.ClassLayout;
import java.util.ArrayList;
import java.util.Comparator;
@@ -40,6 +42,8 @@ import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryU
// time-series
public class LastQueryMergeOperator implements ProcessOperator {
+ private static final int MAP_NODE_RETRAINED_SIZE = 16;
+
private final OperatorContext operatorContext;
private final List<Operator> children;
@@ -219,6 +223,30 @@ public class LastQueryMergeOperator implements ProcessOperator {
return finished;
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ // result size + cached TreeMap size
+ long maxPeekMemory =
+ calculateMaxReturnSize()
+ + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+ * (Location.INSTANCE_SIZE + MAP_NODE_RETRAINED_SIZE);
+ long childrenMaxPeekMemory = 0;
+ for (Operator child : children) {
+ maxPeekMemory += child.calculateMaxReturnSize();
+ childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ }
+ return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ long maxReturnSize = 0;
+ for (Operator child : children) {
+ maxReturnSize = Math.max(maxReturnSize, child.calculateMaxReturnSize());
+ }
+ return maxReturnSize;
+ }
+
/**
* If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
* return false;
@@ -241,6 +269,8 @@ public class LastQueryMergeOperator implements ProcessOperator {
}
private static class Location {
+
+ private static final long INSTANCE_SIZE = ClassLayout.parseClass(Location.class).instanceSize();
int tsBlockIndex;
int rowIndex;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index 57d391d5df..40e434ef54 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import static com.google.common.util.concurrent.Futures.successfulAsList;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
// collect all last query result in the same data region and there is no order guarantee
public class LastQueryOperator implements ProcessOperator {
@@ -140,4 +141,19 @@ public class LastQueryOperator implements ProcessOperator {
private int getEndIndex() {
return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory =
+ Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, tsBlockBuilder.getRetainedSizeInBytes());
+ for (Operator child : children) {
+ maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+ }
+ return maxPeekMemory;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, tsBlockBuilder.getRetainedSizeInBytes());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index 2945856933..1b56d844c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.util.concurrent.Futures.successfulAsList;
import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil.compareTimeSeries;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
// collect all last query result in the same data region and sort them according to the
// time-series's alphabetical order
@@ -190,6 +191,22 @@ public class LastQuerySortOperator implements ProcessOperator {
return !hasNext();
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory =
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + tsBlockBuilder.getRetainedSizeInBytes();
+ long maxChildrenReturnSize = 0;
+ for (Operator child : children) {
+ maxChildrenReturnSize = Math.max(maxChildrenReturnSize, child.calculateMaxReturnSize());
+ }
+ return maxPeekMemory;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
private int getEndIndex() {
return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 93a5d81cc3..afd543b8dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -131,4 +131,14 @@ public class UpdateLastCacheOperator implements ProcessOperator {
public void close() throws Exception {
child.close();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return child.calculateMaxPeekMemory();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return child.calculateMaxReturnSize();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
index c47ab9f95d..5b4f07cf24 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -37,6 +38,8 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
private boolean hasCachedTsBlock = false;
private boolean finished = false;
+ private final long maxReturnSize;
+
public AlignedSeriesScanOperator(
PlanNodeId sourceId,
AlignedPath seriesPath,
@@ -54,6 +57,10 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
timeFilter,
valueFilter,
ascending);
+ // time + all value columns
+ this.maxReturnSize =
+ (1L + seriesPath.getMeasurementList().size())
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
}
@Override
@@ -65,7 +72,9 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
public TsBlock next() {
if (hasCachedTsBlock || hasNext()) {
hasCachedTsBlock = false;
- return tsBlock;
+ TsBlock res = tsBlock;
+ tsBlock = null;
+ return res;
}
throw new IllegalStateException("no next batch");
}
@@ -114,6 +123,16 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
return finished || (finished = !hasNext());
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return maxReturnSize;
+ }
+
private boolean readChunkData() throws IOException {
while (seriesScanUtil.hasNextChunk()) {
if (readPageData()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index c72063caeb..d9202de3f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
public class ExchangeOperator implements SourceOperator {
private final OperatorContext operatorContext;
@@ -62,6 +64,16 @@ public class ExchangeOperator implements SourceOperator {
return sourceHandle.isFinished();
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
@Override
public PlanNodeId getSourceId() {
return sourceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
index dfb6d82c5c..6e5ff45cfa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
@@ -57,6 +57,16 @@ public class LastCacheScanOperator implements SourceOperator {
return !hasNext();
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ return tsBlock.getRetainedSizeInBytes();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return tsBlock.getRetainedSizeInBytes();
+ }
+
@Override
public PlanNodeId getSourceId() {
return sourceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index f74d14a2f6..5a58594940 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -38,6 +39,8 @@ public class SeriesScanOperator implements DataSourceOperator {
private boolean hasCachedTsBlock = false;
private boolean finished = false;
+ private final long maxReturnSize;
+
public SeriesScanOperator(
PlanNodeId sourceId,
PartialPath seriesPath,
@@ -58,6 +61,7 @@ public class SeriesScanOperator implements DataSourceOperator {
timeFilter,
valueFilter,
ascending);
+ this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
}
@Override
@@ -69,7 +73,9 @@ public class SeriesScanOperator implements DataSourceOperator {
public TsBlock next() {
if (hasCachedTsBlock || hasNext()) {
hasCachedTsBlock = false;
- return tsBlock;
+ TsBlock res = tsBlock;
+ tsBlock = null;
+ return res;
}
throw new IllegalStateException("no next batch");
}
@@ -118,6 +124,16 @@ public class SeriesScanOperator implements DataSourceOperator {
return finished || (finished = !hasNext());
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return maxReturnSize;
+ }
+
private boolean readChunkData() throws IOException {
while (seriesScanUtil.hasNextChunk()) {
if (readPageData()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index af6b72e47a..22fd2cbabc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -618,7 +618,9 @@ public class SeriesScanUtil {
if (hasCachedNextOverlappedPage) {
hasCachedNextOverlappedPage = false;
- return cachedTsBlock;
+ TsBlock res = cachedTsBlock;
+ cachedTsBlock = null;
+ return res;
} else {
/*