You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/19 07:21:59 UTC
[iotdb] branch master updated: [IOTDB-4070] Memory control for Query Operators (#6999)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 45923385e3 [IOTDB-4070] Memory control for Query Operators (#6999)
45923385e3 is described below
commit 45923385e3bcc9cdcc71ab66f35c13e07f4d1103
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Fri Aug 19 15:21:54 2022 +0800
[IOTDB-4070] Memory control for Query Operators (#6999)
---
.../org/apache/iotdb/it/env/DataNodeWrapper.java | 2 +
.../org/apache/iotdb/db/it/IoTDBNestedQueryIT.java | 1 +
.../it/IoTDBSameMeasurementsDifferentTypesIT.java | 51 +-
.../db/it/schema/IoTDBSortedShowTimeseriesIT.java | 4 +
.../integration/IoTDBManageTsFileResourceIT.java | 7 +-
.../resources/conf/iotdb-datanode.properties | 14 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 86 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 55 +-
.../iotdb/db/metadata/LocalSchemaProcessor.java | 6 +-
.../db/metadata/rescon/SchemaResourceManager.java | 4 +-
...tatistics.java => SchemaStatisticsManager.java} | 12 +-
.../schemaregion/SchemaRegionMemoryImpl.java | 12 +-
.../schemaregion/SchemaRegionSchemaFileImpl.java | 12 +-
.../timerangeiterator/AggrWindowIterator.java | 23 +
.../timerangeiterator/ITimeRangeIterator.java | 2 +
.../timerangeiterator/PreAggrWindowIterator.java | 17 +
.../PreAggrWindowWithNaturalMonthIterator.java | 16 +
.../SingleTimeWindowIterator.java | 5 +
.../MemoryNotEnoughException.java} | 25 +-
.../fragment/FragmentInstanceContext.java | 4 +
.../mpp/execution/memory/LocalMemoryManager.java | 4 +-
.../db/mpp/execution/operator/AggregationUtil.java | 73 +
.../iotdb/db/mpp/execution/operator/Operator.java | 22 +
.../operator/process/AggregationOperator.java | 42 +-
.../operator/process/DeviceMergeOperator.java | 34 +
.../operator/process/DeviceViewOperator.java | 28 +
.../execution/operator/process/FillOperator.java | 19 +
.../operator/process/FilterAndProjectOperator.java | 108 ++
.../execution/operator/process/LimitOperator.java | 15 +
.../operator/process/LinearFillOperator.java | 20 +
.../execution/operator/process/OffsetOperator.java | 15 +
.../process/RawDataAggregationOperator.java | 7 +-
.../process/SingleInputAggregationOperator.java | 36 +-
.../process/SlidingWindowAggregationOperator.java | 6 +-
.../execution/operator/process/SortOperator.java | 15 +
.../operator/process/TransformOperator.java | 24 +
.../process/join/RowBasedTimeJoinOperator.java | 35 +
.../operator/process/join/TimeJoinOperator.java | 35 +
.../process/last/LastQueryCollectOperator.java | 28 +
.../process/last/LastQueryMergeOperator.java | 47 +
.../operator/process/last/LastQueryOperator.java | 26 +
.../process/last/LastQuerySortOperator.java | 27 +
.../process/last/UpdateLastCacheOperator.java | 15 +
.../operator/schema/CountMergeOperator.java | 29 +
.../operator/schema/DevicesCountOperator.java | 17 +
.../schema/LevelTimeSeriesCountOperator.java | 17 +
.../schema/NodeManageMemoryMergeOperator.java | 18 +
.../operator/schema/NodePathsConvertOperator.java | 15 +
.../operator/schema/NodePathsCountOperator.java | 18 +
.../schema/NodePathsSchemaScanOperator.java | 17 +
.../operator/schema/SchemaFetchMergeOperator.java | 29 +
.../operator/schema/SchemaFetchScanOperator.java | 32 +-
.../operator/schema/SchemaQueryMergeOperator.java | 29 +
.../schema/SchemaQueryOrderByHeatOperator.java | 40 +
.../operator/schema/SchemaQueryScanOperator.java | 35 +-
.../operator/schema/TimeSeriesCountOperator.java | 17 +
.../AbstractSeriesAggregationScanOperator.java | 38 +-
.../AlignedSeriesAggregationScanOperator.java | 9 +-
.../operator/source/AlignedSeriesScanOperator.java | 26 +-
.../operator/source/ExchangeOperator.java | 17 +
.../operator/source/LastCacheScanOperator.java | 15 +
.../source/SeriesAggregationScanOperator.java | 9 +-
.../operator/source/SeriesScanOperator.java | 23 +-
.../execution/operator/source/SeriesScanUtil.java | 4 +-
.../mpp/plan/analyze/GroupByLevelController.java | 28 +-
.../iotdb/db/mpp/plan/analyze/TypeProvider.java | 14 +-
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 69 +-
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 5 +-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 180 ++-
.../StatisticsManager.java} | 35 +-
.../TimeseriesStats.java} | 25 +-
.../column/multi/MappableUDFColumnTransformer.java | 4 +
.../column/ternary/TernaryColumnTransformer.java | 12 +
.../dag/column/unary/UnaryColumnTransformer.java | 4 +
.../iotdb/db/rescon/TsFileResourceManager.java | 3 +-
.../iotdb/db/utils/datastructure/TimeSelector.java | 5 +
.../db/mpp/aggregation/TimeRangeIteratorTest.java | 2 +
.../operator/AggregationOperatorTest.java | 15 +-
.../AlignedSeriesAggregationScanOperatorTest.java | 11 +-
.../mpp/execution/operator/FillOperatorTest.java | 30 +
.../operator/LastQueryMergeOperatorTest.java | 60 +
.../execution/operator/LastQueryOperatorTest.java | 18 +-
.../operator/LastQuerySortOperatorTest.java | 18 +-
.../execution/operator/LinearFillOperatorTest.java | 105 ++
.../mpp/execution/operator/OperatorMemoryTest.java | 1542 ++++++++++++++++++++
.../operator/RawDataAggregationOperatorTest.java | 5 +-
.../SeriesAggregationScanOperatorTest.java | 6 +-
.../SlidingWindowAggregationOperatorTest.java | 10 +-
.../operator/UpdateLastCacheOperatorTest.java | 6 +-
.../iotdb/db/rescon/ResourceManagerTest.java | 10 +-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 2 +
91 files changed, 3401 insertions(+), 316 deletions(-)
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
index c3a9ad7c1f..56a27e24e1 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
@@ -58,6 +58,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
if (this.targetConfigNode != null) {
properties.setProperty(IoTDBConstant.TARGET_CONFIG_NODES, this.targetConfigNode);
}
+ properties.setProperty("max_tsblock_size_in_bytes", "1024");
+ properties.setProperty("page_size_in_byte", "1024");
}
@Override
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBNestedQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBNestedQueryIT.java
index 413aadd643..3acd717f29 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBNestedQueryIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBNestedQueryIT.java
@@ -371,6 +371,7 @@ public class IoTDBNestedQueryIT {
statement.executeQuery(query);
} catch (SQLException e) {
Assert.assertTrue(
+ e.getMessage(),
e.getMessage()
.contains("The argument of the aggregation function must be a time series."));
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSameMeasurementsDifferentTypesIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSameMeasurementsDifferentTypesIT.java
index efa64e4745..82337bd862 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSameMeasurementsDifferentTypesIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSameMeasurementsDifferentTypesIT.java
@@ -47,7 +47,7 @@ import static org.junit.Assert.fail;
@Category({LocalStandaloneIT.class, ClusterIT.class})
public class IoTDBSameMeasurementsDifferentTypesIT {
- private static BaseConfig tsFileConfig = ConfigFactory.getConfig();
+ private static final BaseConfig tsFileConfig = ConfigFactory.getConfig();
private static int maxNumberOfPointsInPage;
private static int pageSizeInByte;
private static int groupSizeInByte;
@@ -88,10 +88,6 @@ public class IoTDBSameMeasurementsDifferentTypesIT {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
- for (String sql : TestConstant.createSql) {
- statement.execute(sql);
- }
-
statement.execute("SET STORAGE GROUP TO root.fans");
statement.execute("CREATE TIMESERIES root.fans.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE");
statement.execute("CREATE TIMESERIES root.fans.d1.s0 WITH DATATYPE=INT64, ENCODING=RLE");
@@ -127,14 +123,13 @@ public class IoTDBSameMeasurementsDifferentTypesIT {
ResultSet resultSet1 = statement1.executeQuery(selectSql);
int cnt1 = 0;
while (resultSet1.next() && cnt1 < 5) {
- StringBuilder builder = new StringBuilder();
- builder
- .append(resultSet1.getString(TestConstant.TIMESTAMP_STR))
- .append(",")
- .append(resultSet1.getString("root.fans.d0.s0"))
- .append(",")
- .append(resultSet1.getString("root.fans.d1.s0"));
- Assert.assertEquals(retArray[cnt1], builder.toString());
+ String ans =
+ resultSet1.getString(TestConstant.TIMESTAMP_STR)
+ + ","
+ + resultSet1.getString("root.fans.d0.s0")
+ + ","
+ + resultSet1.getString("root.fans.d1.s0");
+ Assert.assertEquals(retArray[cnt1], ans);
cnt1++;
}
@@ -142,14 +137,13 @@ public class IoTDBSameMeasurementsDifferentTypesIT {
ResultSet resultSet2 = statement2.executeQuery(selectSql);
int cnt2 = 0;
while (resultSet2.next()) {
- StringBuilder builder = new StringBuilder();
- builder
- .append(resultSet2.getString(TestConstant.TIMESTAMP_STR))
- .append(",")
- .append(resultSet2.getString("root.fans.d0.s0"))
- .append(",")
- .append(resultSet2.getString("root.fans.d1.s0"));
- Assert.assertEquals(retArray[cnt2], builder.toString());
+ String ans =
+ resultSet2.getString(TestConstant.TIMESTAMP_STR)
+ + ","
+ + resultSet2.getString("root.fans.d0.s0")
+ + ","
+ + resultSet2.getString("root.fans.d1.s0");
+ Assert.assertEquals(retArray[cnt2], ans);
cnt2++;
}
Assert.assertEquals(9, cnt2);
@@ -158,14 +152,13 @@ public class IoTDBSameMeasurementsDifferentTypesIT {
// function,
// and the cursor has been moved to the next position, so we should fetch that value first.
do {
- StringBuilder builder = new StringBuilder();
- builder
- .append(resultSet1.getString(TestConstant.TIMESTAMP_STR))
- .append(",")
- .append(resultSet1.getString("root.fans.d0.s0"))
- .append(",")
- .append(resultSet1.getString("root.fans.d1.s0"));
- Assert.assertEquals(retArray[cnt1], builder.toString());
+ String ans =
+ resultSet1.getString(TestConstant.TIMESTAMP_STR)
+ + ","
+ + resultSet1.getString("root.fans.d0.s0")
+ + ","
+ + resultSet1.getString("root.fans.d1.s0");
+ Assert.assertEquals(retArray[cnt1], ans);
cnt1++;
} while (resultSet1.next());
// Although the statement2 has the same sql as statement1, they shouldn't affect each other.
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBSortedShowTimeseriesIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBSortedShowTimeseriesIT.java
index 0410304308..c1554b081b 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBSortedShowTimeseriesIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBSortedShowTimeseriesIT.java
@@ -189,6 +189,7 @@ public class IoTDBSortedShowTimeseriesIT {
count++;
}
assertEquals(retArray1.size(), count);
+ resultSet.close();
resultSet = statement.executeQuery("show LATEST timeseries");
count = 0;
@@ -214,6 +215,7 @@ public class IoTDBSortedShowTimeseriesIT {
count++;
}
assertEquals(retArray2.size(), count);
+ resultSet.close();
} catch (Exception e) {
e.printStackTrace();
@@ -266,6 +268,7 @@ public class IoTDBSortedShowTimeseriesIT {
count++;
}
assertEquals(retSet.size(), count);
+ resultSet.close();
} catch (Exception e) {
e.printStackTrace();
@@ -313,6 +316,7 @@ public class IoTDBSortedShowTimeseriesIT {
count++;
}
assertEquals(retArray.length, count);
+ resultSet.close();
} catch (Exception e) {
e.printStackTrace();
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java
index 41a270e637..96031214e7 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBManageTsFileResourceIT.java
@@ -54,8 +54,7 @@ import static org.junit.Assert.assertTrue;
public class IoTDBManageTsFileResourceIT {
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
private TsFileResourceManager tsFileResourceManager = TsFileResourceManager.getInstance();
- private double prevTimeIndexMemoryProportion;
- private double prevTimeIndexMemoryThreshold;
+ private long prevTimeIndexMemoryThreshold;
private int prevCompactionThreadNum;
private static String[] unSeqSQLs =
@@ -89,7 +88,7 @@ public class IoTDBManageTsFileResourceIT {
@Before
public void setUp() throws ClassNotFoundException {
EnvironmentUtils.envSetUp();
- prevTimeIndexMemoryProportion = CONFIG.getTimeIndexMemoryProportion();
+ prevTimeIndexMemoryThreshold = CONFIG.getAllocateMemoryForTimeIndex();
prevCompactionThreadNum = CONFIG.getConcurrentCompactionThread();
Class.forName(Config.JDBC_DRIVER_NAME);
}
@@ -97,8 +96,6 @@ public class IoTDBManageTsFileResourceIT {
@After
public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
- prevTimeIndexMemoryThreshold =
- prevTimeIndexMemoryProportion * CONFIG.getAllocateMemoryForRead();
tsFileResourceManager.setTimeIndexMemoryThreshold(prevTimeIndexMemoryThreshold);
CONFIG.setConcurrentCompactionThread(prevCompactionThreadNum);
}
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 85dc0ce5ff..42d67f75f0 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -448,10 +448,6 @@ timestamp_precision=ms
# Datatype: double
# flush_proportion=0.4
-# Ratio of read memory allocated for timeIndex, 0.2 by default
-# Datatype: double
-# time_index_memory_proportion=0.2
-
# Ratio of write memory allocated for buffered arrays, 0.6 by default
# Datatype: double
# buffered_arrays_memory_proportion=0.6
@@ -485,6 +481,10 @@ timestamp_precision=ms
# Datatype: int
# io_task_queue_size_for_flushing=10
+# If true, we will estimate each query's possible memory footprint before executing it and deny it if its estimated memory exceeds current free memory
+# Datatype: bool
+# enable_query_memory_estimation=true
+
####################
### Upgrade Configurations
####################
@@ -646,9 +646,9 @@ timestamp_precision=ms
# Datatype: boolean
# meta_data_cache_enable=true
-# Read memory Allocation Ratio: BloomFilterCache, ChunkCache, TimeSeriesMetadataCache, memory used for constructing QueryDataSet and Free Memory Used in Query.
-# The parameter form is a:b:c:d:e, where a, b, c, d and e are integers. for example: 1:1:1:1:1 , 1:100:200:300:400
-# chunk_timeseriesmeta_free_memory_proportion=1:100:200:300:400
+# Read memory Allocation Ratio: BloomFilterCache : ChunkCache : TimeSeriesMetadataCache : Coordinator : Operators : DataExchange : timeIndex in TsFileResourceList : others.
+# The parameter form is a:b:c:d:e:f:g:h, where a, b, c, d, e, f, g and h are integers. for example: 1:1:1:1:1:1:1:1 , 1:100:200:50:200:200:200:50
+# chunk_timeseriesmeta_free_memory_proportion=1:100:200:50:200:200:200:50
####################
### LAST Cache Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 3ee6e92b35..379ba1fe6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -137,19 +137,13 @@ public class IoTDBConfig {
private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10;
/** Memory allocated for the mtree */
- private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() * 1 / 10;
-
- /** Memory allocated for the read process besides cache */
- private long allocateMemoryForReadWithoutCache = allocateMemoryForRead * 300 / 1001;
+ private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() / 10;
private volatile int maxQueryDeduplicatedPathNum = 1000;
/** Ratio of memory allocated for buffered arrays */
private double bufferedArraysMemoryProportion = 0.6;
- /** Memory allocated proportion for timeIndex */
- private double timeIndexMemoryProportion = 0.2;
-
/** Flush proportion for system */
private double flushProportion = 0.4;
@@ -477,6 +471,24 @@ public class IoTDBConfig {
/** Memory allocated for chunk cache in read process */
private long allocateMemoryForChunkCache = allocateMemoryForRead * 100 / 1001;
+ /** Memory allocated for operators */
+ private long allocateMemoryForCoordinator = allocateMemoryForRead * 50 / 1001;
+
+ /** Memory allocated for operators */
+ private long allocateMemoryForOperators = allocateMemoryForRead * 200 / 1001;
+
+ /** Memory allocated for operators */
+ private long allocateMemoryForDataExchange = allocateMemoryForRead * 200 / 1001;
+
+ /** Memory allocated proportion for timeIndex */
+ private long allocateMemoryForTimeIndex = allocateMemoryForRead * 200 / 1001;
+
+ /**
+ * If true, we will estimate each query's possible memory footprint before executing it and deny
+ * it if its estimated memory exceeds current free memory
+ */
+ private boolean enableQueryMemoryEstimation = true;
+
/** Whether to enable Last cache */
private boolean lastCacheEnable = true;
@@ -1358,6 +1370,10 @@ public class IoTDBConfig {
this.concurrentSubRawQueryThread = concurrentSubRawQueryThread;
}
+ public long getMaxBytesPerQuery() {
+ return allocateMemoryForDataExchange / concurrentQueryThread;
+ }
+
public int getRawQueryBlockingQueueCapacity() {
return rawQueryBlockingQueueCapacity;
}
@@ -1688,14 +1704,6 @@ public class IoTDBConfig {
this.bufferedArraysMemoryProportion = bufferedArraysMemoryProportion;
}
- public double getTimeIndexMemoryProportion() {
- return timeIndexMemoryProportion;
- }
-
- public void setTimeIndexMemoryProportion(double timeIndexMemoryProportion) {
- this.timeIndexMemoryProportion = timeIndexMemoryProportion;
- }
-
public double getFlushProportion() {
return flushProportion;
}
@@ -1744,14 +1752,6 @@ public class IoTDBConfig {
this.allocateMemoryForRead = allocateMemoryForRead;
}
- public long getAllocateMemoryForReadWithoutCache() {
- return allocateMemoryForReadWithoutCache;
- }
-
- public void setAllocateMemoryForReadWithoutCache(long allocateMemoryForReadWithoutCache) {
- this.allocateMemoryForReadWithoutCache = allocateMemoryForReadWithoutCache;
- }
-
public boolean isEnableExternalSort() {
return enableExternalSort;
}
@@ -1963,6 +1963,46 @@ public class IoTDBConfig {
this.allocateMemoryForChunkCache = allocateMemoryForChunkCache;
}
+ public long getAllocateMemoryForCoordinator() {
+ return allocateMemoryForCoordinator;
+ }
+
+ public void setAllocateMemoryForCoordinator(long allocateMemoryForCoordinator) {
+ this.allocateMemoryForCoordinator = allocateMemoryForCoordinator;
+ }
+
+ public long getAllocateMemoryForOperators() {
+ return allocateMemoryForOperators;
+ }
+
+ public void setAllocateMemoryForOperators(long allocateMemoryForOperators) {
+ this.allocateMemoryForOperators = allocateMemoryForOperators;
+ }
+
+ public long getAllocateMemoryForDataExchange() {
+ return allocateMemoryForDataExchange;
+ }
+
+ public void setAllocateMemoryForDataExchange(long allocateMemoryForDataExchange) {
+ this.allocateMemoryForDataExchange = allocateMemoryForDataExchange;
+ }
+
+ public long getAllocateMemoryForTimeIndex() {
+ return allocateMemoryForTimeIndex;
+ }
+
+ public void setAllocateMemoryForTimeIndex(long allocateMemoryForTimeIndex) {
+ this.allocateMemoryForTimeIndex = allocateMemoryForTimeIndex;
+ }
+
+ public boolean isEnableQueryMemoryEstimation() {
+ return enableQueryMemoryEstimation;
+ }
+
+ public void setEnableQueryMemoryEstimation(boolean enableQueryMemoryEstimation) {
+ this.enableQueryMemoryEstimation = enableQueryMemoryEstimation;
+ }
+
public boolean isLastCacheEnabled() {
return lastCacheEnable;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 3d9e81624f..6376b09224 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -200,12 +200,6 @@ public class IoTDBDescriptor {
"buffered_arrays_memory_proportion",
Double.toString(conf.getBufferedArraysMemoryProportion()))));
- conf.setTimeIndexMemoryProportion(
- Double.parseDouble(
- properties.getProperty(
- "time_index_memory_proportion",
- Double.toString(conf.getTimeIndexMemoryProportion()))));
-
conf.setFlushProportion(
Double.parseDouble(
properties.getProperty(
@@ -1223,6 +1217,16 @@ public class IoTDBDescriptor {
"max_tsblock_size_in_bytes",
Integer.toString(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes()))));
+
+ // min(default_size, maxBytesForQuery)
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setMaxTsBlockSizeInBytes(
+ (int)
+ Math.min(
+ TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
+ conf.getMaxBytesPerQuery()));
+
TSFileDescriptor.getInstance()
.getConfig()
.setMaxTsBlockLineNumber(
@@ -1421,6 +1425,13 @@ public class IoTDBDescriptor {
.trim()));
conf.setIpWhiteList(properties.getProperty("ip_white_list", conf.getIpWhiteList()));
+ // update enable query memory estimation for memory control
+ conf.setEnableQueryMemoryEstimation(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_query_memory_estimation",
+ Boolean.toString(conf.isEnableQueryMemoryEstimation()))));
+
// update wal config
long prevDeleteWalFilesPeriodInMs = conf.getDeleteWalFilesPeriodInMs();
loadWALHotModifiedProps(properties);
@@ -1485,9 +1496,11 @@ public class IoTDBDescriptor {
"max_deduplicated_path_num",
Integer.toString(conf.getMaxQueryDeduplicatedPathNum()))));
- if (!conf.isMetaDataCacheEnable()) {
- return;
- }
+ conf.setEnableQueryMemoryEstimation(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_query_memory_estimation",
+ Boolean.toString(conf.isEnableQueryMemoryEstimation()))));
String queryMemoryAllocateProportion =
properties.getProperty("chunk_timeseriesmeta_free_memory_proportion");
@@ -1506,8 +1519,14 @@ public class IoTDBDescriptor {
maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum);
conf.setAllocateMemoryForTimeSeriesMetaDataCache(
maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / proportionSum);
- conf.setAllocateMemoryForReadWithoutCache(
+ conf.setAllocateMemoryForCoordinator(
maxMemoryAvailable * Integer.parseInt(proportions[3].trim()) / proportionSum);
+ conf.setAllocateMemoryForOperators(
+ maxMemoryAvailable * Integer.parseInt(proportions[4].trim()) / proportionSum);
+ conf.setAllocateMemoryForDataExchange(
+ maxMemoryAvailable * Integer.parseInt(proportions[5].trim()) / proportionSum);
+ conf.setAllocateMemoryForTimeIndex(
+ maxMemoryAvailable * Integer.parseInt(proportions[6].trim()) / proportionSum);
} catch (Exception e) {
throw new RuntimeException(
"Each subsection of configuration item chunkmeta_chunk_timeseriesmeta_free_memory_proportion"
@@ -1516,6 +1535,22 @@ public class IoTDBDescriptor {
}
}
}
+
+ // metadata cache is disabled, we need to move all their allocated memory to other parts
+ if (!conf.isMetaDataCacheEnable()) {
+ long sum =
+ conf.getAllocateMemoryForBloomFilterCache()
+ + conf.getAllocateMemoryForChunkCache()
+ + conf.getAllocateMemoryForTimeSeriesMetaDataCache();
+ conf.setAllocateMemoryForBloomFilterCache(0);
+ conf.setAllocateMemoryForChunkCache(0);
+ conf.setAllocateMemoryForTimeSeriesMetaDataCache(0);
+ long partForDataExchange = sum / 2;
+ long partForOperators = sum - partForDataExchange;
+ conf.setAllocateMemoryForDataExchange(
+ conf.getAllocateMemoryForDataExchange() + partForDataExchange);
+ conf.setAllocateMemoryForOperators(conf.getAllocateMemoryForOperators() + partForOperators);
+ }
}
private void initSchemaMemoryAllocate(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index 32518a6239..fafd13ddd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.Template;
@@ -466,7 +466,7 @@ public class LocalSchemaProcessor {
// todo this is for test assistance, refactor this to support massive timeseries
if (pathPattern.getFullPath().equals("root.**")
&& TemplateManager.getInstance().getAllTemplateName().isEmpty()) {
- return (int) TimeseriesStatistics.getInstance().getTotalSeriesNumber();
+ return (int) SchemaStatisticsManager.getInstance().getTotalSeriesNumber();
}
int count = 0;
for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
@@ -1380,7 +1380,7 @@ public class LocalSchemaProcessor {
@TestOnly
public long getTotalSeriesNumber() {
- return TimeseriesStatistics.getInstance().getTotalSeriesNumber();
+ return SchemaStatisticsManager.getInstance().getTotalSeriesNumber();
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
index 0fe18ac955..c5df8ff9a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
@@ -30,7 +30,7 @@ public class SchemaResourceManager {
private SchemaResourceManager() {}
public static void initSchemaResource() {
- TimeseriesStatistics.getInstance().init();
+ SchemaStatisticsManager.getInstance().init();
MemoryStatistics.getInstance().init();
if (IoTDBDescriptor.getInstance()
.getConfig()
@@ -41,7 +41,7 @@ public class SchemaResourceManager {
}
public static void clearSchemaResource() {
- TimeseriesStatistics.getInstance().clear();
+ SchemaStatisticsManager.getInstance().clear();
MemoryStatistics.getInstance().clear();
if (IoTDBDescriptor.getInstance()
.getConfig()
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
rename to server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
index f8a1f21a20..6d63cdfe85 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
@@ -25,22 +25,22 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import java.util.concurrent.atomic.AtomicLong;
-public class TimeseriesStatistics {
+public class SchemaStatisticsManager {
private final AtomicLong totalSeriesNumber = new AtomicLong();
- private static class TimeseriesStatisticsHolder {
+ private static class SchemaStatisticsHolder {
- private TimeseriesStatisticsHolder() {
+ private SchemaStatisticsHolder() {
// allowed to do nothing
}
- private static final TimeseriesStatistics INSTANCE = new TimeseriesStatistics();
+ private static final SchemaStatisticsManager INSTANCE = new SchemaStatisticsManager();
}
/** we should not use this function in other place, but only in IoTDB class */
- public static TimeseriesStatistics getInstance() {
- return TimeseriesStatisticsHolder.INSTANCE;
+ public static SchemaStatisticsManager getInstance() {
+ return SchemaStatisticsHolder.INSTANCE;
}
public void init() {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 71c6b3632e..a4c2c8a8fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -52,7 +52,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGMemoryImpl;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
import org.apache.iotdb.db.metadata.tag.TagManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.TemplateManager;
@@ -168,7 +168,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
private boolean usingMLog = true;
private MLogWriter logWriter;
- private TimeseriesStatistics timeseriesStatistics = TimeseriesStatistics.getInstance();
+ private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance();
private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
private final IStorageGroupMNode storageGroupMNode;
@@ -454,7 +454,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
// collect all the LeafMNode in this schema region
List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
- timeseriesStatistics.deleteTimeseries(leafMNodes.size());
+ schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
// drop triggers with no exceptions
TriggerEngine.drop(leafMNodes);
@@ -601,7 +601,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
mNodeCache.invalidate(path.getDevicePath());
// update statistics and schemaDataTypeNumMap
- timeseriesStatistics.addTimeseries(1);
+ schemaStatisticsManager.addTimeseries(1);
// update tag index
if (offset != -1 && isRecovering) {
@@ -718,7 +718,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
mNodeCache.invalidate(prefixPath);
// update statistics and schemaDataTypeNumMap
- timeseriesStatistics.addTimeseries(plan.getMeasurements().size());
+ schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
List<Long> tagOffsets = plan.getTagOffsets();
for (int i = 0; i < measurements.size(); i++) {
@@ -860,7 +860,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
mNodeCache.invalidate(node.getPartialPath());
- timeseriesStatistics.deleteTimeseries(1);
+ schemaStatisticsManager.deleteTimeseries(1);
return storageGroupPath;
}
// endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 34c49106b3..57129f5063 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -50,7 +50,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGCachedImpl;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
import org.apache.iotdb.db.metadata.tag.TagManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.TemplateManager;
@@ -162,7 +162,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
private File logFile;
private MLogWriter logWriter;
- private TimeseriesStatistics timeseriesStatistics = TimeseriesStatistics.getInstance();
+ private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance();
private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
private final IStorageGroupMNode storageGroupMNode;
@@ -413,7 +413,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
// collect all the LeafMNode in this schema region
List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
- timeseriesStatistics.deleteTimeseries(leafMNodes.size());
+ schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
// drop triggers with no exceptions
TriggerEngine.drop(leafMNodes);
@@ -495,7 +495,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
mNodeCache.invalidate(path.getDevicePath());
// update statistics and schemaDataTypeNumMap
- timeseriesStatistics.addTimeseries(1);
+ schemaStatisticsManager.addTimeseries(1);
// update tag index
if (offset != -1 && isRecovering) {
@@ -637,7 +637,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
mNodeCache.invalidate(prefixPath);
// update statistics and schemaDataTypeNumMap
- timeseriesStatistics.addTimeseries(plan.getMeasurements().size());
+ schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
List<Long> tagOffsets = plan.getTagOffsets();
for (int i = 0; i < measurements.size(); i++) {
@@ -785,7 +785,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
mNodeCache.invalidate(node.getPartialPath());
- timeseriesStatistics.deleteTimeseries(1);
+ schemaStatisticsManager.deleteTimeseries(1);
return storageGroupPath;
}
// endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
index f27c56223c..50f50e4175 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
@@ -176,4 +176,27 @@ public class AggrWindowIterator implements ITimeRangeIterator {
public long currentOutputTime() {
return leftCRightO ? curTimeRange.getMin() : curTimeRange.getMax();
}
+
+ @Override
+ public long getTotalIntervalNum() {
+ long queryRange = endTime - startTime;
+ long intervalNum;
+
+ if (isSlidingStepByMonth) {
+ intervalNum = (long) Math.ceil(queryRange / (double) (slidingStep * MS_TO_MONTH));
+ long retStartTime = DatetimeUtils.calcIntervalByMonth(startTime, intervalNum * slidingStep);
+ while (retStartTime > endTime) {
+ intervalNum -= 1;
+ retStartTime = DatetimeUtils.calcIntervalByMonth(startTime, intervalNum * slidingStep);
+ }
+ } else {
+ intervalNum = (long) Math.ceil(queryRange / (double) slidingStep);
+ }
+ return intervalNum;
+ }
+
+ public void reset() {
+ curTimeRange = null;
+ hasCachedTimeRange = false;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
index ccc999e810..47a6961fd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
@@ -55,4 +55,6 @@ public interface ITimeRangeIterator {
* @return minTime if leftCloseRightOpen, else maxTime.
*/
long currentOutputTime();
+
+ long getTotalIntervalNum();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowIterator.java
index f7f6297ddb..c0b8c3b000 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowIterator.java
@@ -173,4 +173,21 @@ public class PreAggrWindowIterator implements ITimeRangeIterator {
public long currentOutputTime() {
return leftCRightO ? curTimeRange.getMin() : curTimeRange.getMax();
}
+
+ @Override
+ public long getTotalIntervalNum() {
+ long queryRange = endTime - startTime;
+ if (slidingStep >= interval || interval % slidingStep == 0) {
+ return (long) Math.ceil(queryRange / (double) slidingStep);
+ }
+
+ long interval1 = interval % slidingStep, interval2 = slidingStep - interval % slidingStep;
+ long intervalNum = Math.floorDiv(queryRange, interval1 + interval2);
+ long tmpStartTime = startTime + intervalNum * (interval1 + interval2);
+ if (tmpStartTime + interval1 > endTime) {
+ return intervalNum * 2 + 1;
+ } else {
+ return intervalNum * 2 + 2;
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
index d8eede01ef..87f6cb8630 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
@@ -147,4 +147,20 @@ public class PreAggrWindowWithNaturalMonthIterator implements ITimeRangeIterator
public long currentOutputTime() {
return leftCRightO ? curTimeRange.getMin() : curTimeRange.getMax();
}
+
+ @Override
+ public long getTotalIntervalNum() {
+ long tmpInterval = 0;
+ while (hasNextTimeRange()) {
+ tmpInterval++;
+ nextTimeRange();
+ }
+
+ curTimeRange = null;
+ timeBoundaryHeap.clear();
+ aggrWindowIterator.reset();
+ initHeap();
+
+ return tmpInterval;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
index b998557604..4802267afa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
@@ -71,4 +71,9 @@ public class SingleTimeWindowIterator implements ITimeRangeIterator {
public long currentOutputTime() {
return curTimeRange.getMin();
}
+
+ @Override
+ public long getTotalIntervalNum() {
+ return 1;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/exception/MemoryNotEnoughException.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/exception/MemoryNotEnoughException.java
index d0eff60394..7c5f4803e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/exception/MemoryNotEnoughException.java
@@ -16,28 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.iotdb.db.mpp.exception;
-package org.apache.iotdb.db.mpp.execution.memory;
+import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+public class MemoryNotEnoughException extends IoTDBException {
-/**
- * Manages memory of a data node. The memory is divided into two memory pools so that the memory for
- * read and for write can be isolated.
- */
-public class LocalMemoryManager {
-
- private final MemoryPool queryPool;
-
- public LocalMemoryManager() {
- queryPool =
- new MemoryPool(
- "query",
- IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead(),
- (long) (IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead() * 0.5));
- }
-
- public MemoryPool getQueryPool() {
- return queryPool;
+ public MemoryNotEnoughException(String message, int errorCode) {
+ super(message, errorCode);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index aff2d66a41..4d0f9d9ffd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -179,4 +179,8 @@ public class FragmentInstanceContext extends QueryContext {
public long getEndTime() {
return executionEndTime.get();
}
+
+ public FragmentInstanceStateMachine getStateMachine() {
+ return stateMachine;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
index d0eff60394..e3d170cbf7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
@@ -33,8 +33,8 @@ public class LocalMemoryManager {
queryPool =
new MemoryPool(
"query",
- IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead(),
- (long) (IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead() * 0.5));
+ IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForDataExchange(),
+ IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerQuery());
}
public MemoryPool getQueryPool() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
index 5465800553..5bbb1adfa0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
@@ -19,20 +19,35 @@
package org.apache.iotdb.db.mpp.execution.operator;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.SingleTimeWindowIterator;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.TimeRangeIteratorFactory;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.statistics.StatisticsManager;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.iotdb.tsfile.utils.Pair;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
public class AggregationUtil {
@@ -158,4 +173,62 @@ public class AggregationUtil {
}
return true;
}
+
+ public static long calculateMaxAggregationResultSize(
+ List<? extends AggregationDescriptor> aggregationDescriptors,
+ ITimeRangeIterator timeRangeIterator,
+ TypeProvider typeProvider) {
+ long timeValueColumnsSizePerLine = TimeColumn.SIZE_IN_BYTES_PER_POSITION;
+ for (AggregationDescriptor descriptor : aggregationDescriptors) {
+ List<TSDataType> outPutDataTypes =
+ descriptor.getOutputColumnNames().stream()
+ .map(typeProvider::getType)
+ .collect(Collectors.toList());
+ for (TSDataType tsDataType : outPutDataTypes) {
+ // TODO modify after statistics finish
+ PartialPath mockSeriesPath = new PartialPath();
+ timeValueColumnsSizePerLine += getOutputColumnSizePerLine(tsDataType, mockSeriesPath);
+ }
+ }
+
+ return Math.min(
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ Math.min(
+ TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
+ timeRangeIterator.getTotalIntervalNum())
+ * timeValueColumnsSizePerLine);
+ }
+
+ public static long calculateMaxAggregationResultSizeForLastQuery(
+ List<Aggregator> aggregators, PartialPath inputSeriesPath) {
+ long timeValueColumnsSizePerLine = TimeColumn.SIZE_IN_BYTES_PER_POSITION;
+ List<TSDataType> outPutDataTypes =
+ aggregators.stream()
+ .flatMap(aggregator -> Arrays.stream(aggregator.getOutputType()))
+ .collect(Collectors.toList());
+ for (TSDataType tsDataType : outPutDataTypes) {
+ timeValueColumnsSizePerLine += getOutputColumnSizePerLine(tsDataType, inputSeriesPath);
+ }
+ return timeValueColumnsSizePerLine;
+ }
+
+ private static long getOutputColumnSizePerLine(
+ TSDataType tsDataType, PartialPath inputSeriesPath) {
+ switch (tsDataType) {
+ case INT32:
+ return IntColumn.SIZE_IN_BYTES_PER_POSITION;
+ case INT64:
+ return LongColumn.SIZE_IN_BYTES_PER_POSITION;
+ case FLOAT:
+ return FloatColumn.SIZE_IN_BYTES_PER_POSITION;
+ case DOUBLE:
+ return DoubleColumn.SIZE_IN_BYTES_PER_POSITION;
+ case BOOLEAN:
+ return BooleanColumn.SIZE_IN_BYTES_PER_POSITION;
+ case TEXT:
+ return StatisticsManager.getInstance().getMaxBinarySizeInBytes(inputSeriesPath);
+ default:
+ throw new UnsupportedOperationException("Unknown data type " + tsDataType);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
index dfa08e033f..b7b05a9289 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
@@ -52,4 +52,26 @@ public interface Operator extends AutoCloseable {
* Is this operator completely finished processing and no more output TsBlock will be produced.
*/
boolean isFinished();
+
+ /**
+ * We should also consider the memory used by its children operator, so the calculation logic may
+ * be like: long estimatedOfCurrentOperator = XXXXX; return max(estimatedOfCurrentOperator,
+ * child1.calculateMaxPeekMemory(), child2.calculateMaxPeekMemory(), ....)
+ *
+ * <p>Each operator's MaxPeekMemory should also take retained size of each child operator into
+ * account.
+ *
+ * @return estimated max memory footprint that the Operator Tree(rooted from this operator) will
+ * use while doing its own query processing
+ */
+ long calculateMaxPeekMemory();
+
+ /** @return estimated max memory footprint for returned TsBlock when calling operator.next() */
+ long calculateMaxReturnSize();
+
+ /**
+ * @return each operator's retained size(including all its children's retained size) after calling
+ * its next() method
+ */
+ long calculateRetainedSizeAfterCallingNext();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 8cf4aef8b0..62215cb9c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -37,7 +36,6 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.util.concurrent.Futures.successfulAsList;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
/**
* AggregationOperator can process the situation: aggregation of intermediate aggregate result, it
@@ -62,16 +60,20 @@ public class AggregationOperator implements ProcessOperator {
// using for building result tsBlock
private final TsBlockBuilder resultTsBlockBuilder;
+ private final long maxRetainedSize;
+ private final long childrenRetainedSize;
+ private final long maxReturnSize;
+
public AggregationOperator(
OperatorContext operatorContext,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
List<Operator> children,
- boolean ascending,
- GroupByTimeParameter groupByTimeParameter,
- boolean outputPartialTimeWindow) {
+ long maxReturnSize) {
this.operatorContext = operatorContext;
this.children = children;
this.aggregators = aggregators;
+ this.timeRangeIterator = timeRangeIterator;
this.inputOperatorsCount = children.size();
this.inputTsBlocks = new TsBlock[inputOperatorsCount];
@@ -80,14 +82,16 @@ public class AggregationOperator implements ProcessOperator {
canCallNext[i] = false;
}
- this.timeRangeIterator =
- initTimeRangeIterator(groupByTimeParameter, ascending, outputPartialTimeWindow);
-
List<TSDataType> dataTypes = new ArrayList<>();
for (Aggregator aggregator : aggregators) {
dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+ this.maxRetainedSize = children.stream().mapToLong(Operator::calculateMaxReturnSize).sum();
+ this.childrenRetainedSize =
+ children.stream().mapToLong(Operator::calculateRetainedSizeAfterCallingNext).sum();
+ this.maxReturnSize = maxReturnSize;
}
@Override
@@ -95,6 +99,21 @@ public class AggregationOperator implements ProcessOperator {
return operatorContext;
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ return maxReturnSize + maxRetainedSize + childrenRetainedSize;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return maxRetainedSize + childrenRetainedSize;
+ }
+
@Override
public ListenableFuture<?> isBlocked() {
List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
@@ -123,9 +142,6 @@ public class AggregationOperator implements ProcessOperator {
long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
long start = System.nanoTime();
- // reset operator state
- resultTsBlockBuilder.reset();
-
while (System.nanoTime() - start < maxRuntime
&& (curTimeRange != null || timeRangeIterator.hasNextTimeRange())
&& !resultTsBlockBuilder.isFull()) {
@@ -149,7 +165,9 @@ public class AggregationOperator implements ProcessOperator {
}
if (resultTsBlockBuilder.getPositionCount() > 0) {
- return resultTsBlockBuilder.build();
+ TsBlock resultTsBlock = resultTsBlockBuilder.build();
+ resultTsBlockBuilder.reset();
+ return resultTsBlock;
} else {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index e0ccbe86ba..511a75991b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
@@ -278,4 +279,37 @@ public class DeviceMergeOperator implements ProcessOperator {
return inputTsBlocks[tsBlockIndex] == null
|| inputTsBlocks[tsBlockIndex].getPositionCount() == 0;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ // timeSelector will cache time, we use a single time column to represent max memory cost
+ long maxPeekMemory = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ // inputTsBlocks will cache all TsBlocks returned by deviceOperators
+ for (Operator operator : deviceOperators) {
+ maxPeekMemory += operator.calculateMaxReturnSize();
+ maxPeekMemory += operator.calculateRetainedSizeAfterCallingNext();
+ }
+ for (Operator operator : deviceOperators) {
+ maxPeekMemory = Math.max(maxPeekMemory, operator.calculateMaxPeekMemory());
+ }
+ return Math.max(maxPeekMemory, calculateMaxReturnSize());
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ // time + all value columns
+ return (1L + dataTypes.size()) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+ for (Operator child : deviceOperators) {
+ long maxReturnSize = child.calculateMaxReturnSize();
+ currentRetainedSize += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+ minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+ }
+ // max cached TsBlock
+ return currentRetainedSize - minChildReturnSize;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index 127a26a8b6..728da7e7db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
@@ -152,4 +153,31 @@ public class DeviceViewOperator implements ProcessOperator {
public boolean isFinished() {
return !this.hasNext();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = calculateMaxReturnSize() + calculateRetainedSizeAfterCallingNext();
+ for (Operator child : deviceOperators) {
+ maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+ }
+ return maxPeekMemory;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ // null columns would be filled, so return size equals to
+ // (numberOfValueColumns(dataTypes.size() - 1) + 1(timeColumn)) * columnSize + deviceColumnSize
+ // size of device name column is ignored
+ return (long) (dataTypes.size())
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long sum = 0;
+ for (Operator operator : deviceOperators) {
+ sum += operator.calculateRetainedSizeAfterCallingNext();
+ }
+ return sum;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index da7ffe6ab1..c182168bd3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -91,4 +91,23 @@ public class FillOperator implements ProcessOperator {
public boolean isFinished() {
return child.isFinished();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ // while doing constant and previous fill, we may need to copy the corresponding column if there
+ // exists null values
+ // so the max peek memory may be double
+ return 2 * child.calculateMaxPeekMemory() + child.calculateRetainedSizeAfterCallingNext();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ // we can safely ignore one line cached in IFill
+ return child.calculateRetainedSizeAfterCallingNext();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return child.calculateMaxReturnSize();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
index 60dc07b2f2..80b75a40e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
@@ -22,7 +22,14 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.BinaryColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.multi.MappableUDFColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ternary.TernaryColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.unary.UnaryColumnTransformer;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -199,4 +206,105 @@ public class FilterAndProjectOperator implements ProcessOperator {
public ListenableFuture<?> isBlocked() {
return inputOperator.isBlocked();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = inputOperator.calculateMaxReturnSize();
+ int maxCachedColumn = 0;
+ // Only do projection, calculate max cached column size of calc tree
+ if (!hasFilter) {
+ for (int i = 0; i < projectOutputTransformerList.size(); i++) {
+ ColumnTransformer c = projectOutputTransformerList.get(i);
+ maxCachedColumn = Math.max(maxCachedColumn, 1 + i + getMaxLevelOfColumnTransformerTree(c));
+ }
+ return Math.max(
+ maxPeekMemory,
+ (long) maxCachedColumn
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte())
+ + inputOperator.calculateRetainedSizeAfterCallingNext();
+ }
+
+ // has Filter
+ maxCachedColumn =
+ Math.max(
+ 1 + getMaxLevelOfColumnTransformerTree(filterOutputTransformer),
+ 1 + commonTransformerList.size());
+ if (!hasNonMappableUDF) {
+ for (int i = 0; i < projectOutputTransformerList.size(); i++) {
+ ColumnTransformer c = projectOutputTransformerList.get(i);
+ maxCachedColumn = Math.max(maxCachedColumn, 1 + i + getMaxLevelOfColumnTransformerTree(c));
+ }
+ }
+ return Math.max(
+ maxPeekMemory,
+ (long) maxCachedColumn * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte())
+ + inputOperator.calculateRetainedSizeAfterCallingNext();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ // time + all value columns
+ if (!hasFilter || !hasNonMappableUDF) {
+ return (long) (1 + projectOutputTransformerList.size())
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ } else {
+ return (long) (1 + filterTsBlockBuilder.getValueColumnBuilders().length)
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ }
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return inputOperator.calculateRetainedSizeAfterCallingNext();
+ }
+
+ private int getMaxLevelOfColumnTransformerTree(ColumnTransformer columnTransformer) {
+ if (columnTransformer instanceof LeafColumnTransformer) {
+ // Time column is always calculated, we ignore it here. Constant column is ignored.
+ if (columnTransformer instanceof ConstantColumnTransformer
+ || columnTransformer instanceof TimeColumnTransformer) {
+ return 0;
+ } else {
+ return 1;
+ }
+ } else if (columnTransformer instanceof UnaryColumnTransformer) {
+ return Math.max(
+ 2,
+ getMaxLevelOfColumnTransformerTree(
+ ((UnaryColumnTransformer) columnTransformer).getChildColumnTransformer()));
+ } else if (columnTransformer instanceof BinaryColumnTransformer) {
+ int childMaxLevel =
+ Math.max(
+ getMaxLevelOfColumnTransformerTree(
+ ((BinaryColumnTransformer) columnTransformer).getLeftTransformer()),
+ getMaxLevelOfColumnTransformerTree(
+ ((BinaryColumnTransformer) columnTransformer).getRightTransformer()));
+ return Math.max(3, childMaxLevel);
+ } else if (columnTransformer instanceof TernaryColumnTransformer) {
+ int childMaxLevel =
+ Math.max(
+ getMaxLevelOfColumnTransformerTree(
+ ((TernaryColumnTransformer) columnTransformer).getFirstColumnTransformer()),
+ Math.max(
+ getMaxLevelOfColumnTransformerTree(
+ ((TernaryColumnTransformer) columnTransformer).getSecondColumnTransformer()),
+ getMaxLevelOfColumnTransformerTree(
+ ((TernaryColumnTransformer) columnTransformer).getThirdColumnTransformer())));
+ return Math.max(4, childMaxLevel);
+ } else if (columnTransformer instanceof MappableUDFColumnTransformer) {
+ int childMaxLevel = 0;
+ for (ColumnTransformer c :
+ ((MappableUDFColumnTransformer) columnTransformer).getInputColumnTransformers()) {
+ childMaxLevel = Math.max(childMaxLevel, getMaxLevelOfColumnTransformerTree(c));
+ }
+ return Math.max(
+ 1
+ + ((MappableUDFColumnTransformer) columnTransformer)
+ .getInputColumnTransformers()
+ .length,
+ childMaxLevel);
+ } else {
+ throw new UnsupportedOperationException("Unsupported ColumnTransformer");
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index ef3870fb17..726f5e7ecf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -80,4 +80,19 @@ public class LimitOperator implements ProcessOperator {
public boolean isFinished() {
return remainingLimit == 0 || child.isFinished();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return child.calculateMaxPeekMemory();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return child.calculateMaxReturnSize();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return child.calculateRetainedSizeAfterCallingNext();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index fcae23ce7c..bcbb92a932 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -160,6 +160,26 @@ public class LinearFillOperator implements ProcessOperator {
return cachedTsBlock.isEmpty() && child.isFinished();
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ // while doing linear fill, we may need to copy the corresponding column if there exists null
+ // values, and we may also need to cache next TsBlock to get next not null value
+ // so the max peek memory may be triple or more, here we just use 3 as the estimated factor
+ // because in most cases, we will get next not null value in next TsBlock
+ return 3 * child.calculateMaxPeekMemory() + child.calculateRetainedSizeAfterCallingNext();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return child.calculateMaxReturnSize();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ // we can safely ignore two lines cached in LinearFill
+ return child.calculateRetainedSizeAfterCallingNext();
+ }
+
/**
* Judge whether we can use current cached TsBlock to fill Column
*
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index 2820992745..572738d081 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -79,4 +79,19 @@ public class OffsetOperator implements ProcessOperator {
public boolean isFinished() {
return child.isFinished();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return child.calculateMaxPeekMemory();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return child.calculateMaxReturnSize();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return child.calculateRetainedSizeAfterCallingNext();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index 88fb5fe20a..d95968850d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -45,10 +45,11 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
public RawDataAggregationOperator(
OperatorContext operatorContext,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
Operator child,
boolean ascending,
- GroupByTimeParameter groupByTimeParameter) {
- super(operatorContext, aggregators, child, ascending, groupByTimeParameter, true);
+ long maxReturnSize) {
+ super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index c59de13988..8632041ee9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -37,7 +36,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
public abstract class SingleInputAggregationOperator implements ProcessOperator {
@@ -57,26 +55,30 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
// using for building result tsBlock
protected final TsBlockBuilder resultTsBlockBuilder;
+ protected final long maxRetainedSize;
+ protected final long maxReturnSize;
+
public SingleInputAggregationOperator(
OperatorContext operatorContext,
List<Aggregator> aggregators,
Operator child,
boolean ascending,
- GroupByTimeParameter groupByTimeParameter,
- boolean outputPartialTimeWindow) {
+ ITimeRangeIterator timeRangeIterator,
+ long maxReturnSize) {
this.operatorContext = operatorContext;
this.ascending = ascending;
this.child = child;
this.aggregators = aggregators;
-
- this.timeRangeIterator =
- initTimeRangeIterator(groupByTimeParameter, ascending, outputPartialTimeWindow);
+ this.timeRangeIterator = timeRangeIterator;
List<TSDataType> dataTypes = new ArrayList<>();
for (Aggregator aggregator : aggregators) {
dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+ this.maxRetainedSize = child.calculateMaxReturnSize();
+ this.maxReturnSize = maxReturnSize;
}
@Override
@@ -101,7 +103,6 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
long start = System.nanoTime();
// reset operator state
- resultTsBlockBuilder.reset();
canCallNext = true;
while (System.nanoTime() - start < maxRuntime
@@ -124,7 +125,9 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
}
if (resultTsBlockBuilder.getPositionCount() > 0) {
- return resultTsBlockBuilder.build();
+ TsBlock resultTsBlock = resultTsBlockBuilder.build();
+ resultTsBlockBuilder.reset();
+ return resultTsBlock;
} else {
return null;
}
@@ -146,4 +149,19 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
curTimeRange = null;
appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator);
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return maxReturnSize + maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 10303016c5..d73491d3e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -40,10 +40,12 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
public SlidingWindowAggregationOperator(
OperatorContext operatorContext,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
Operator child,
boolean ascending,
- GroupByTimeParameter groupByTimeParameter) {
- super(operatorContext, aggregators, child, ascending, groupByTimeParameter, false);
+ GroupByTimeParameter groupByTimeParameter,
+ long maxReturnSize) {
+ super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize);
checkArgument(
groupByTimeParameter != null,
"GroupByTimeParameter cannot be null in SlidingWindowAggregationOperator");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
index 38bda24bf7..5fa693b2a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
@@ -54,4 +54,19 @@ public class SortOperator implements ProcessOperator {
public boolean isFinished() {
return false;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index 0e08b682f1..13f97fe8c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.mpp.transformation.dag.input.QueryDataSetInputLayer;
import org.apache.iotdb.db.mpp.transformation.dag.input.TsBlockInputDataSet;
import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -350,4 +351,27 @@ public class TransformOperator implements ProcessOperator {
public OperatorContext getOperatorContext() {
return operatorContext;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ // here we use maximum estimated memory usage
+ return (long)
+ (udfCollectorMemoryBudgetInMB
+ + udfTransformerMemoryBudgetInMB
+ + inputOperator.calculateMaxReturnSize());
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ // time + all value columns
+ return (long) (1 + transformers.length)
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ // Collector may cache points, here we use maximum usage
+ return (long)
+ (inputOperator.calculateRetainedSizeAfterCallingNext() + udfCollectorMemoryBudgetInMB);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index f026ef35c4..695893d400 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerge
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -244,6 +245,40 @@ public class RowBasedTimeJoinOperator implements ProcessOperator {
return finished;
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = 0;
+ long childrenMaxPeekMemory = 0;
+ for (Operator child : children) {
+ childrenMaxPeekMemory =
+ Math.max(childrenMaxPeekMemory, maxPeekMemory + child.calculateMaxPeekMemory());
+ maxPeekMemory +=
+ (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
+ }
+
+ maxPeekMemory += calculateMaxReturnSize();
+ return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ // time + all value columns
+ return (1L + outputColumnCount)
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+ for (Operator child : children) {
+ long maxReturnSize = child.calculateMaxReturnSize();
+ currentRetainedSize += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+ minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+ }
+ // max cached TsBlock
+ return currentRetainedSize - minChildReturnSize;
+ }
+
private void updateTimeSelector(int index) {
timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index]));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
index eb185e09ff..3170838986 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerge
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -243,6 +244,40 @@ public class TimeJoinOperator implements ProcessOperator {
return finished;
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = 0;
+ long childrenMaxPeekMemory = 0;
+ for (Operator child : children) {
+ childrenMaxPeekMemory =
+ Math.max(childrenMaxPeekMemory, maxPeekMemory + child.calculateMaxPeekMemory());
+ maxPeekMemory +=
+ (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
+ }
+
+ maxPeekMemory += calculateMaxReturnSize();
+ return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ // time + all value columns
+ return (1L + outputColumnCount)
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+ for (Operator child : children) {
+ long maxReturnSize = child.calculateMaxReturnSize();
+ currentRetainedSize += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+ minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+ }
+ // max cached TsBlock
+ return currentRetainedSize - minChildReturnSize;
+ }
+
/**
* If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
* return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
index 7f011c32cf..b91e346372 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
@@ -85,4 +85,32 @@ public class LastQueryCollectOperator implements ProcessOperator {
public boolean isFinished() {
return !hasNext();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = 0;
+ for (Operator child : children) {
+ maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+ maxPeekMemory = Math.max(maxPeekMemory, child.calculateRetainedSizeAfterCallingNext());
+ }
+ return maxPeekMemory;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ long maxReturnMemory = 0;
+ for (Operator child : children) {
+ maxReturnMemory = Math.max(maxReturnMemory, child.calculateMaxReturnSize());
+ }
+ return maxReturnMemory;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long sum = 0;
+ for (Operator operator : children) {
+ sum += operator.calculateRetainedSizeAfterCallingNext();
+ }
+ return sum;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index 99deebfd59..5ad04bdc2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.db.mpp.execution.operator.process.last;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
import com.google.common.util.concurrent.ListenableFuture;
+import org.openjdk.jol.info.ClassLayout;
import java.util.ArrayList;
import java.util.Comparator;
@@ -40,6 +42,8 @@ import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryU
// time-series
public class LastQueryMergeOperator implements ProcessOperator {
+ public static final long MAP_NODE_RETRAINED_SIZE = 16L + Location.INSTANCE_SIZE;
+
private final OperatorContext operatorContext;
private final List<Operator> children;
@@ -219,6 +223,47 @@ public class LastQueryMergeOperator implements ProcessOperator {
return finished;
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = 0;
+ long childrenMaxPeekMemory = 0;
+ for (Operator child : children) {
+ childrenMaxPeekMemory =
+ Math.max(childrenMaxPeekMemory, maxPeekMemory + child.calculateMaxPeekMemory());
+ maxPeekMemory +=
+ (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
+ }
+ // result size + cached TreeMap size
+ maxPeekMemory +=
+ (calculateMaxReturnSize()
+ + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+ * MAP_NODE_RETRAINED_SIZE);
+ return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ long maxReturnSize = 0;
+ for (Operator child : children) {
+ maxReturnSize = Math.max(maxReturnSize, child.calculateMaxReturnSize());
+ }
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long childrenSum = 0, minChildReturnSize = Long.MAX_VALUE;
+ for (Operator child : children) {
+ long maxReturnSize = child.calculateMaxReturnSize();
+ childrenSum += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+ minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+ }
+ // max cached TsBlock + cached TreeMap size
+ return (childrenSum - minChildReturnSize)
+ + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+ * MAP_NODE_RETRAINED_SIZE;
+ }
+
/**
* If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
* return false;
@@ -241,6 +286,8 @@ public class LastQueryMergeOperator implements ProcessOperator {
}
private static class Location {
+
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(Location.class).instanceSize();
int tsBlockIndex;
int rowIndex;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index 57d391d5df..60620b1f56 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import static com.google.common.util.concurrent.Futures.successfulAsList;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
// collect all last query result in the same data region and there is no order guarantee
public class LastQueryOperator implements ProcessOperator {
@@ -140,4 +141,29 @@ public class LastQueryOperator implements ProcessOperator {
private int getEndIndex() {
return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory =
+ Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, tsBlockBuilder.getRetainedSizeInBytes());
+ long res = 0;
+ for (Operator child : children) {
+ res = Math.max(res, maxPeekMemory + child.calculateMaxPeekMemory());
+ }
+ return res;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, tsBlockBuilder.getRetainedSizeInBytes());
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long sum = 0;
+ for (Operator operator : children) {
+ sum += operator.calculateRetainedSizeAfterCallingNext();
+ }
+ return sum;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index 2945856933..082173c29f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.util.concurrent.Futures.successfulAsList;
import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil.compareTimeSeries;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
// collect all last query result in the same data region and sort them according to the
// time-series's alphabetical order
@@ -190,6 +191,32 @@ public class LastQuerySortOperator implements ProcessOperator {
return !hasNext();
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + cachedTsBlock.getRetainedSizeInBytes();
+ long res = 0;
+ for (Operator child : children) {
+ res = Math.max(res, maxPeekMemory + child.calculateMaxPeekMemory());
+ }
+ return res;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long childrenMaxReturnSize = 0;
+ long childrenSumRetainedSize = 0;
+ for (Operator child : children) {
+ childrenMaxReturnSize = Math.max(childrenMaxReturnSize, child.calculateMaxReturnSize());
+ childrenSumRetainedSize += child.calculateRetainedSizeAfterCallingNext();
+ }
+ return cachedTsBlock.getRetainedSizeInBytes() + childrenMaxReturnSize + childrenSumRetainedSize;
+ }
+
private int getEndIndex() {
return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 93a5d81cc3..44e423424e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -131,4 +131,19 @@ public class UpdateLastCacheOperator implements ProcessOperator {
public void close() throws Exception {
child.close();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return child.calculateMaxPeekMemory();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return child.calculateMaxReturnSize();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return child.calculateRetainedSizeAfterCallingNext();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
index a9176d658c..7524b7c455 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
@@ -133,4 +133,33 @@ public class CountMergeOperator implements ProcessOperator {
public boolean isFinished() {
return isFinished;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long childrenMaxPeekMemory = 0;
+ for (Operator child : children) {
+ childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ }
+
+ return childrenMaxPeekMemory;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ long childrenMaxReturnSize = 0;
+ for (Operator child : children) {
+ childrenMaxReturnSize = Math.max(childrenMaxReturnSize, child.calculateMaxReturnSize());
+ }
+
+ return childrenMaxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long retainedSize = 0L;
+ for (Operator child : children) {
+ retainedSize += child.calculateRetainedSizeAfterCallingNext();
+ }
+ return retainedSize;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
index ca69fef36f..3e9bd02c02 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java
@@ -97,4 +97,21 @@ public class DevicesCountOperator implements SourceOperator {
public boolean isFinished() {
return isFinished;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ // the integer used for count
+ return 4L;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ // the integer used for count
+ return 4L;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
index 1630af74a2..aa5d865014 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
@@ -36,6 +36,8 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
public class LevelTimeSeriesCountOperator implements SourceOperator {
private final PlanNodeId sourceId;
private final OperatorContext operatorContext;
@@ -124,4 +126,19 @@ public class LevelTimeSeriesCountOperator implements SourceOperator {
public boolean isFinished() {
return isFinished;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
index a4510ed53c..781ba46695 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
@@ -40,6 +40,7 @@ import java.util.TreeSet;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
public class NodeManageMemoryMergeOperator implements ProcessOperator {
private final OperatorContext operatorContext;
@@ -137,4 +138,21 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
public boolean isFinished() {
return !isReadingMemory && child.isFinished();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ // todo calculate the result based on all the scan node; currently, this is shadowed by
+ // schemaQueryMergeNode
+ return Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxPeekMemory());
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize());
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + child.calculateRetainedSizeAfterCallingNext();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
index 76ec17b585..85bed1cf20 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
@@ -107,4 +107,19 @@ public class NodePathsConvertOperator implements ProcessOperator {
public boolean isFinished() {
return child.isFinished();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return child.calculateMaxReturnSize() + child.calculateMaxPeekMemory();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return child.calculateMaxReturnSize();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return child.calculateRetainedSizeAfterCallingNext();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
index 79b3af2c41..6690848db1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
@@ -36,6 +36,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
public class NodePathsCountOperator implements ProcessOperator {
@@ -107,4 +108,21 @@ public class NodePathsCountOperator implements ProcessOperator {
public boolean isFinished() {
return isFinished;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ // todo calculate the result based on all the scan node; currently, this is shadowed by
+ // schemaQueryMergeNode
+ return Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxPeekMemory());
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize());
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + child.calculateRetainedSizeAfterCallingNext();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
index 1daade8dc3..6d1eea794e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java
@@ -38,6 +38,8 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
public class NodePathsSchemaScanOperator implements SourceOperator {
private final PlanNodeId sourceId;
@@ -128,4 +130,19 @@ public class NodePathsSchemaScanOperator implements SourceOperator {
public PlanNodeId getSourceId() {
return sourceId;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
index 92b023eabb..96ff8cbc11 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
@@ -122,4 +122,33 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
new BinaryColumn(
1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())}));
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long childrenMaxPeekMemory = 0;
+ for (Operator child : children) {
+ childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ }
+
+ return childrenMaxPeekMemory;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ long childrenMaxReturnSize = 0;
+ for (Operator child : children) {
+ childrenMaxReturnSize = Math.max(childrenMaxReturnSize, child.calculateMaxReturnSize());
+ }
+
+ return childrenMaxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long retainedSize = 0L;
+ for (Operator child : children) {
+ retainedSize += child.calculateRetainedSizeAfterCallingNext();
+ }
+ return retainedSize;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
index 3a22065879..4e07c4178c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
@@ -44,6 +44,8 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
public class SchemaFetchScanOperator implements SourceOperator {
private static final Logger logger = LoggerFactory.getLogger(SchemaFetchScanOperator.class);
@@ -55,7 +57,6 @@ public class SchemaFetchScanOperator implements SourceOperator {
private final ISchemaRegion schemaRegion;
- private TsBlock tsBlock;
private boolean isFinished = false;
public SchemaFetchScanOperator(
@@ -83,12 +84,11 @@ public class SchemaFetchScanOperator implements SourceOperator {
}
isFinished = true;
try {
- fetchSchema();
+ return fetchSchema();
} catch (MetadataException e) {
logger.error("Error occurred during execute SchemaFetchOperator {}", sourceId, e);
throw new RuntimeException(e);
}
- return tsBlock;
}
@Override
@@ -106,7 +106,7 @@ public class SchemaFetchScanOperator implements SourceOperator {
return sourceId;
}
- private void fetchSchema() throws MetadataException {
+ private TsBlock fetchSchema() throws MetadataException {
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
List<PartialPath> partialPathList = patternTree.getAllPathPatterns();
for (PartialPath path : partialPathList) {
@@ -122,10 +122,24 @@ public class SchemaFetchScanOperator implements SourceOperator {
} catch (IOException e) {
// Totally memory operation. This case won't happen.
}
- this.tsBlock =
- new TsBlock(
- new TimeColumn(1, new long[] {0}),
- new BinaryColumn(
- 1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())}));
+ return new TsBlock(
+ new TimeColumn(1, new long[] {0}),
+ new BinaryColumn(
+ 1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())}));
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
index 519cdce27e..f2671034e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
@@ -80,4 +80,33 @@ public class SchemaQueryMergeOperator implements ProcessOperator {
child.close();
}
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long childrenMaxPeekMemory = 0;
+ for (Operator child : children) {
+ childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ }
+
+ return childrenMaxPeekMemory;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ long childrenMaxReturnSize = 0;
+ for (Operator child : children) {
+ childrenMaxReturnSize = Math.max(childrenMaxReturnSize, child.calculateMaxReturnSize());
+ }
+
+ return childrenMaxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long retainedSize = 0L;
+ for (Operator child : children) {
+ retainedSize += child.calculateRetainedSizeAfterCallingNext();
+ }
+ return retainedSize;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
index f52cbf62a4..eb3c1f4ec2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
@@ -186,4 +186,44 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator {
public boolean isFinished() {
return isFinished;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = 0;
+
+ for (Operator child : operators) {
+ maxPeekMemory += child.calculateMaxReturnSize();
+ }
+
+ for (Operator child : operators) {
+ maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+ }
+
+ return maxPeekMemory;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ long maxReturnSize = 0;
+
+ for (Operator child : operators) {
+ maxReturnSize += child.calculateMaxReturnSize();
+ }
+
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long retainedSize = 0L;
+
+ for (Operator child : operators) {
+ retainedSize += child.calculateMaxReturnSize();
+ }
+
+ for (Operator child : operators) {
+ retainedSize += child.calculateRetainedSizeAfterCallingNext();
+ }
+ return retainedSize;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
index b318c5e7db..4493dc58a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
@@ -24,11 +24,13 @@ import org.apache.iotdb.db.mpp.execution.operator.source.SourceOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
public abstract class SchemaQueryScanOperator implements SourceOperator {
protected OperatorContext operatorContext;
protected TsBlock tsBlock;
- private boolean hasCachedTsBlock;
+ protected boolean isFinished = false;
protected int limit;
protected int offset;
@@ -85,19 +87,25 @@ public abstract class SchemaQueryScanOperator implements SourceOperator {
@Override
public TsBlock next() {
- hasCachedTsBlock = false;
- return tsBlock;
+ isFinished = true;
+ TsBlock result = tsBlock;
+ tsBlock = null;
+ return result;
}
@Override
public boolean hasNext() {
+ if (isFinished) {
+ return false;
+ }
if (tsBlock == null) {
tsBlock = createTsBlock();
- if (tsBlock.getPositionCount() > 0) {
- hasCachedTsBlock = true;
+ if (tsBlock.getPositionCount() == 0) {
+ isFinished = true;
+ return false;
}
}
- return hasCachedTsBlock;
+ return true;
}
@Override
@@ -109,4 +117,19 @@ public abstract class SchemaQueryScanOperator implements SourceOperator {
public PlanNodeId getSourceId() {
return sourceId;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
index c87f2ee765..299961a561 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java
@@ -113,4 +113,21 @@ public class TimeSeriesCountOperator implements SourceOperator {
public boolean isFinished() {
return isFinished;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ // the integer used for count
+ return 4L;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ // the integer used for count
+ return 4L;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 2d7d671be9..42ec93cfca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
@@ -40,7 +41,6 @@ import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
public abstract class AbstractSeriesAggregationScanOperator implements DataSourceOperator {
@@ -68,14 +68,19 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
protected boolean finished = false;
+ private final long maxRetainedSize;
+ private final long maxReturnSize;
+
public AbstractSeriesAggregationScanOperator(
PlanNodeId sourceId,
OperatorContext context,
SeriesScanUtil seriesScanUtil,
int subSensorSize,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
boolean ascending,
- GroupByTimeParameter groupByTimeParameter) {
+ GroupByTimeParameter groupByTimeParameter,
+ long maxReturnSize) {
this.sourceId = sourceId;
this.operatorContext = context;
this.ascending = ascending;
@@ -83,14 +88,17 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
this.seriesScanUtil = seriesScanUtil;
this.subSensorSize = subSensorSize;
this.aggregators = aggregators;
-
- this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true);
+ this.timeRangeIterator = timeRangeIterator;
List<TSDataType> dataTypes = new ArrayList<>();
for (Aggregator aggregator : aggregators) {
dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+ this.maxRetainedSize =
+ (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ this.maxReturnSize = maxReturnSize;
}
@Override
@@ -108,6 +116,21 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
seriesScanUtil.initQueryDataSource(dataSource);
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ return maxRetainedSize + maxReturnSize;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return maxRetainedSize;
+ }
+
@Override
public boolean hasNext() {
return timeRangeIterator.hasNextTimeRange();
@@ -119,9 +142,6 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
long start = System.nanoTime();
- // reset operator state
- resultTsBlockBuilder.reset();
-
while (System.nanoTime() - start < maxRuntime
&& timeRangeIterator.hasNextTimeRange()
&& !resultTsBlockBuilder.isFull()) {
@@ -138,7 +158,9 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
}
if (resultTsBlockBuilder.getPositionCount() > 0) {
- return resultTsBlockBuilder.build();
+ TsBlock resultTsBlock = resultTsBlockBuilder.build();
+ resultTsBlockBuilder.reset();
+ return resultTsBlock;
} else {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
index f9fa1f8d52..2ad3eb1a83 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregationScanOperator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -37,9 +38,11 @@ public class AlignedSeriesAggregationScanOperator extends AbstractSeriesAggregat
AlignedPath seriesPath,
OperatorContext context,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
Filter timeFilter,
boolean ascending,
- GroupByTimeParameter groupByTimeParameter) {
+ GroupByTimeParameter groupByTimeParameter,
+ long maxReturnSize) {
super(
sourceId,
context,
@@ -52,7 +55,9 @@ public class AlignedSeriesAggregationScanOperator extends AbstractSeriesAggregat
ascending),
seriesPath.getMeasurementList().size(),
aggregators,
+ timeRangeIterator,
ascending,
- groupByTimeParameter);
+ groupByTimeParameter,
+ maxReturnSize);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
index c47ab9f95d..c64947e4b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -37,6 +38,8 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
private boolean hasCachedTsBlock = false;
private boolean finished = false;
+ private final long maxReturnSize;
+
public AlignedSeriesScanOperator(
PlanNodeId sourceId,
AlignedPath seriesPath,
@@ -54,6 +57,10 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
timeFilter,
valueFilter,
ascending);
+ // time + all value columns
+ this.maxReturnSize =
+ (1L + seriesPath.getMeasurementList().size())
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
}
@Override
@@ -65,7 +72,9 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
public TsBlock next() {
if (hasCachedTsBlock || hasNext()) {
hasCachedTsBlock = false;
- return tsBlock;
+ TsBlock res = tsBlock;
+ tsBlock = null;
+ return res;
}
throw new IllegalStateException("no next batch");
}
@@ -114,6 +123,21 @@ public class AlignedSeriesScanOperator implements DataSourceOperator {
return finished || (finished = !hasNext());
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
+ }
+
private boolean readChunkData() throws IOException {
while (seriesScanUtil.hasNextChunk()) {
if (readPageData()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index c72063caeb..110079809b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
public class ExchangeOperator implements SourceOperator {
private final OperatorContext operatorContext;
@@ -62,6 +64,21 @@ public class ExchangeOperator implements SourceOperator {
return sourceHandle.isFinished();
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
+ }
+
@Override
public PlanNodeId getSourceId() {
return sourceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
index dfb6d82c5c..974758f8a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
@@ -57,6 +57,21 @@ public class LastCacheScanOperator implements SourceOperator {
return !hasNext();
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ return tsBlock.getRetainedSizeInBytes();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return tsBlock.getRetainedSizeInBytes();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
+ }
+
@Override
public PlanNodeId getSourceId() {
return sourceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
index 157f51f2fa..99b24e06a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -44,9 +45,11 @@ public class SeriesAggregationScanOperator extends AbstractSeriesAggregationScan
Set<String> allSensors,
OperatorContext context,
List<Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator,
Filter timeFilter,
boolean ascending,
- GroupByTimeParameter groupByTimeParameter) {
+ GroupByTimeParameter groupByTimeParameter,
+ long maxReturnSize) {
super(
sourceId,
context,
@@ -60,7 +63,9 @@ public class SeriesAggregationScanOperator extends AbstractSeriesAggregationScan
ascending),
1,
aggregators,
+ timeRangeIterator,
ascending,
- groupByTimeParameter);
+ groupByTimeParameter,
+ maxReturnSize);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index f74d14a2f6..05685f758d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -38,6 +39,8 @@ public class SeriesScanOperator implements DataSourceOperator {
private boolean hasCachedTsBlock = false;
private boolean finished = false;
+ private final long maxReturnSize;
+
public SeriesScanOperator(
PlanNodeId sourceId,
PartialPath seriesPath,
@@ -58,6 +61,7 @@ public class SeriesScanOperator implements DataSourceOperator {
timeFilter,
valueFilter,
ascending);
+ this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
}
@Override
@@ -69,7 +73,9 @@ public class SeriesScanOperator implements DataSourceOperator {
public TsBlock next() {
if (hasCachedTsBlock || hasNext()) {
hasCachedTsBlock = false;
- return tsBlock;
+ TsBlock res = tsBlock;
+ tsBlock = null;
+ return res;
}
throw new IllegalStateException("no next batch");
}
@@ -118,6 +124,21 @@ public class SeriesScanOperator implements DataSourceOperator {
return finished || (finished = !hasNext());
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
+ }
+
private boolean readChunkData() throws IOException {
while (seriesScanUtil.hasNextChunk()) {
if (readPageData()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index af6b72e47a..22fd2cbabc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -618,7 +618,9 @@ public class SeriesScanUtil {
if (hasCachedNextOverlappedPage) {
hasCachedNextOverlappedPage = false;
- return cachedTsBlock;
+ TsBlock res = cachedTsBlock;
+ cachedTsBlock = null;
+ return res;
} else {
/*
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
index 4f8da81f15..ce6dfa03da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/GroupByLevelController.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.ArrayList;
@@ -84,8 +86,9 @@ public class GroupByLevelController {
PartialPath rawPath = ((TimeSeriesOperand) expression.getExpressions().get(0)).getPath();
PartialPath groupedPath = generatePartialPathByLevel(isCountStar, rawPath, levels);
- checkDatatypeConsistency(
- groupedPath.getFullPath(), ((FunctionExpression) expression).getFunctionName(), rawPath);
+ String functionName = ((FunctionExpression) expression).getFunctionName();
+ checkDatatypeConsistency(groupedPath.getFullPath(), functionName, rawPath);
+ updateTypeProvider(functionName, groupedPath.getFullPath(), rawPath);
Expression rawPathExpression = new TimeSeriesOperand(rawPath);
Expression groupedPathExpression = new TimeSeriesOperand(groupedPath);
@@ -122,9 +125,7 @@ public class GroupByLevelController {
case SQLConstant.COUNT:
case SQLConstant.AVG:
case SQLConstant.SUM:
- try {
- typeProvider.getType(groupedPath);
- } catch (StatementAnalyzeException e) {
+ if (!typeProvider.containsTypeInfoOf(groupedPath)) {
typeProvider.setType(groupedPath, rawPath.getSeriesType());
}
return;
@@ -133,7 +134,9 @@ public class GroupByLevelController {
case SQLConstant.FIRST_VALUE:
case SQLConstant.MAX_VALUE:
case SQLConstant.EXTREME:
- try {
+ if (!typeProvider.containsTypeInfoOf(groupedPath)) {
+ typeProvider.setType(groupedPath, rawPath.getSeriesType());
+ } else {
TSDataType tsDataType = typeProvider.getType(groupedPath);
if (tsDataType != rawPath.getSeriesType()) {
throw new SemanticException(
@@ -141,8 +144,6 @@ public class GroupByLevelController {
"GROUP BY LEVEL: the data types of the same output column[%s] should be the same.",
groupedPath));
}
- } catch (StatementAnalyzeException e) {
- typeProvider.setType(groupedPath, rawPath.getSeriesType());
}
return;
default:
@@ -224,4 +225,15 @@ public class GroupByLevelController {
public Map<Expression, Expression> getRawPathToGroupedPathMap() {
return rawPathToGroupedPathMap;
}
+
+ private void updateTypeProvider(String functionName, String groupedPath, PartialPath rawPath) {
+ List<AggregationType> splitAggregations =
+ SchemaUtils.splitPartialAggregation(AggregationType.valueOf(functionName.toUpperCase()));
+ for (AggregationType aggregationType : splitAggregations) {
+ String splitFunctionName = aggregationType.toString().toLowerCase();
+ typeProvider.setType(
+ String.format("%s(%s)", splitFunctionName, groupedPath),
+ SchemaUtils.getSeriesTypeByPath(rawPath, splitFunctionName));
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/TypeProvider.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/TypeProvider.java
index b196d04a2b..6314102a5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/TypeProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/TypeProvider.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.plan.analyze;
-import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -30,6 +29,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import static com.google.common.base.Preconditions.checkState;
+
public class TypeProvider {
private final Map<String, TSDataType> typeMap;
@@ -43,17 +44,14 @@ public class TypeProvider {
}
public TSDataType getType(String path) {
- if (!typeMap.containsKey(path)) {
- throw new StatementAnalyzeException(String.format("no data type found for path: %s", path));
- }
+ checkState(typeMap.containsKey(path), String.format("no data type found for path: %s", path));
return typeMap.get(path);
}
public void setType(String path, TSDataType dataType) {
- if (typeMap.containsKey(path) && typeMap.get(path) != dataType) {
- throw new StatementAnalyzeException(
- String.format("inconsistent data type for path: %s", path));
- }
+ checkState(
+ !typeMap.containsKey(path) || typeMap.get(path) == dataType,
+ String.format("inconsistent data type for path: %s", path));
this.typeMap.put(path, dataType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 8188612e61..f7ba31d0d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -18,19 +18,27 @@
*/
package org.apache.iotdb.db.mpp.plan.planner;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.mpp.exception.MemoryNotEnoughException;
import org.apache.iotdb.db.mpp.execution.driver.DataDriver;
import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.timer.ITimeSliceAllocator;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import io.airlift.concurrent.SetThreadName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Used to plan a fragment instance. Currently, we simply change it from PlanNode to executable
* Operator tree, but in the future, we may split one fragment instance into multiple pipeline to
@@ -38,6 +46,12 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
*/
public class LocalExecutionPlanner {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LocalExecutionPlanner.class);
+
+ /** allocated memory for operator execution */
+ private long freeMemoryForOperators =
+ IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators();
+
public static LocalExecutionPlanner getInstance() {
return InstanceHolder.INSTANCE;
}
@@ -47,11 +61,15 @@ public class LocalExecutionPlanner {
TypeProvider types,
FragmentInstanceContext instanceContext,
Filter timeFilter,
- DataRegion dataRegion) {
+ DataRegion dataRegion)
+ throws MemoryNotEnoughException {
LocalExecutionPlanContext context = new LocalExecutionPlanContext(types, instanceContext);
Operator root = plan.accept(new OperatorTreeGenerator(), context);
+ // check whether current free memory is enough to execute current query
+ checkMemory(root, instanceContext.getStateMachine());
+
ITimeSliceAllocator timeSliceAllocator = context.getTimeSliceAllocator();
instanceContext
.getOperatorContexts()
@@ -71,7 +89,8 @@ public class LocalExecutionPlanner {
}
public SchemaDriver plan(
- PlanNode plan, FragmentInstanceContext instanceContext, ISchemaRegion schemaRegion) {
+ PlanNode plan, FragmentInstanceContext instanceContext, ISchemaRegion schemaRegion)
+ throws MemoryNotEnoughException {
SchemaDriverContext schemaDriverContext =
new SchemaDriverContext(instanceContext, schemaRegion);
@@ -81,6 +100,9 @@ public class LocalExecutionPlanner {
Operator root = plan.accept(new OperatorTreeGenerator(), context);
+ // check whether current free memory is enough to execute current query
+ checkMemory(root, instanceContext.getStateMachine());
+
ITimeSliceAllocator timeSliceAllocator = context.getTimeSliceAllocator();
instanceContext
.getOperatorContexts()
@@ -91,6 +113,49 @@ public class LocalExecutionPlanner {
return new SchemaDriver(root, context.getSinkHandle(), schemaDriverContext);
}
+ private void checkMemory(Operator root, FragmentInstanceStateMachine stateMachine)
+ throws MemoryNotEnoughException {
+
+ // if it is disabled, just return
+ if (!IoTDBDescriptor.getInstance().getConfig().isEnableQueryMemoryEstimation()) {
+ return;
+ }
+
+ long estimatedMemorySize = root.calculateMaxPeekMemory();
+
+ synchronized (this) {
+ if (estimatedMemorySize > freeMemoryForOperators) {
+ throw new MemoryNotEnoughException(
+ String.format(
+ "There is not enough memory to execute current fragment instance, current remaining free memory is %d, estimated memory usage for current fragment instance is %d",
+ freeMemoryForOperators, estimatedMemorySize),
+ TSStatusCode.MEMORY_NOT_ENOUGH.getStatusCode());
+ } else {
+ freeMemoryForOperators -= estimatedMemorySize;
+ LOGGER.info(
+ String.format(
+ "consume memory: %d, current remaining memory: %d",
+ estimatedMemorySize, freeMemoryForOperators));
+ }
+ }
+
+ stateMachine.addStateChangeListener(
+ newState -> {
+ if (newState.isDone()) {
+ try (SetThreadName fragmentInstanceName =
+ new SetThreadName(stateMachine.getFragmentInstanceId().getFullId())) {
+ synchronized (this) {
+ this.freeMemoryForOperators += estimatedMemorySize;
+ LOGGER.info(
+ String.format(
+ "release memory: %d, current remaining memory: %d",
+ estimatedMemorySize, freeMemoryForOperators));
+ }
+ }
+ }
+ });
+ }
+
private static class InstanceHolder {
private InstanceHolder() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 8ec8817486..589cc3d95e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -471,9 +471,8 @@ public class LogicalPlanBuilder {
constructAggregationDescriptorList(aggregationExpressions, curStep);
if (curStep.isOutputPartial()) {
aggregationDescriptorList.forEach(
- aggregationDescriptor -> {
- updateTypeProviderByPartialAggregation(aggregationDescriptor, typeProvider);
- });
+ aggregationDescriptor ->
+ updateTypeProviderByPartialAggregation(aggregationDescriptor, typeProvider));
}
this.root =
new AggregationNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 214d024741..07a8e37051 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -27,12 +27,14 @@ import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.exchange.ISourceHandle;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
+import org.apache.iotdb.db.mpp.execution.operator.AggregationUtil;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
@@ -150,6 +152,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn;
import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
@@ -185,6 +188,9 @@ import java.util.Objects;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSize;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSizeForLastQuery;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
/** This Visitor is responsible for transferring PlanNode Tree to Operator Tree */
@@ -272,6 +278,55 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
return seriesScanOperator;
}
+ @Override
+ public Operator visitSeriesAggregationScan(
+ SeriesAggregationScanNode node, LocalExecutionPlanContext context) {
+ PartialPath seriesPath = node.getSeriesPath();
+ boolean ascending = node.getScanOrder() == Ordering.ASC;
+ OperatorContext operatorContext =
+ context
+ .getInstanceContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ SeriesAggregationScanOperator.class.getSimpleName());
+
+ List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
+ List<Aggregator> aggregators = new ArrayList<>();
+ aggregationDescriptors.forEach(
+ o ->
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ o.getAggregationType(), node.getSeriesPath().getSeriesType(), ascending),
+ o.getStep())));
+
+ GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, true);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ node.getAggregationDescriptorList(), timeRangeIterator, context.getTypeProvider());
+
+ SeriesAggregationScanOperator aggregateScanOperator =
+ new SeriesAggregationScanOperator(
+ node.getPlanNodeId(),
+ seriesPath,
+ context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()),
+ operatorContext,
+ aggregators,
+ timeRangeIterator,
+ node.getTimeFilter(),
+ ascending,
+ node.getGroupByTimeParameter(),
+ maxReturnSize);
+
+ context.addSourceOperator(aggregateScanOperator);
+ context.addPath(seriesPath);
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
+ return aggregateScanOperator;
+ }
+
@Override
public Operator visitAlignedSeriesAggregationScan(
AlignedSeriesAggregationScanNode node, LocalExecutionPlanContext context) {
@@ -307,15 +362,24 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
Collections.singletonList(new InputLocation[] {new InputLocation(0, seriesIndex)})));
}
+ GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, true);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ node.getAggregationDescriptorList(), timeRangeIterator, context.getTypeProvider());
+
AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
new AlignedSeriesAggregationScanOperator(
node.getPlanNodeId(),
seriesPath,
operatorContext,
aggregators,
+ timeRangeIterator,
node.getTimeFilter(),
ascending,
- node.getGroupByTimeParameter());
+ groupByTimeParameter,
+ maxReturnSize);
context.addSourceOperator(seriesAggregationScanOperator);
context.addPath(seriesPath);
@@ -555,47 +619,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
return new NodePathsCountOperator(operatorContext, child);
}
- @Override
- public Operator visitSeriesAggregationScan(
- SeriesAggregationScanNode node, LocalExecutionPlanContext context) {
- PartialPath seriesPath = node.getSeriesPath();
- boolean ascending = node.getScanOrder() == Ordering.ASC;
- OperatorContext operatorContext =
- context
- .getInstanceContext()
- .addOperatorContext(
- context.getNextOperatorId(),
- node.getPlanNodeId(),
- SeriesAggregationScanOperator.class.getSimpleName());
-
- List<Aggregator> aggregators = new ArrayList<>();
- node.getAggregationDescriptorList()
- .forEach(
- o ->
- aggregators.add(
- new Aggregator(
- AccumulatorFactory.createAccumulator(
- o.getAggregationType(),
- node.getSeriesPath().getSeriesType(),
- ascending),
- o.getStep())));
- SeriesAggregationScanOperator aggregateScanOperator =
- new SeriesAggregationScanOperator(
- node.getPlanNodeId(),
- seriesPath,
- context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()),
- operatorContext,
- aggregators,
- node.getTimeFilter(),
- ascending,
- node.getGroupByTimeParameter());
-
- context.addSourceOperator(aggregateScanOperator);
- context.addPath(seriesPath);
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
- return aggregateScanOperator;
- }
-
@Override
public Operator visitDeviceView(DeviceViewNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
@@ -1016,7 +1039,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
boolean ascending = node.getScanOrder() == Ordering.ASC;
List<Aggregator> aggregators = new ArrayList<>();
Map<String, List<InputLocation>> layout = makeLayout(node);
- for (GroupByLevelDescriptor descriptor : node.getGroupByLevelDescriptors()) {
+ List<GroupByLevelDescriptor> aggregationDescriptors = node.getGroupByLevelDescriptors();
+ for (GroupByLevelDescriptor descriptor : aggregationDescriptors) {
List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
TSDataType seriesDataType =
context
@@ -1038,9 +1062,16 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
node.getPlanNodeId(),
AggregationOperator.class.getSimpleName());
+ GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, false);
+ long maxReturnSize =
+ calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
+
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
return new AggregationOperator(
- operatorContext, aggregators, children, ascending, node.getGroupByTimeParameter(), false);
+ operatorContext, aggregators, timeRangeIterator, children, maxReturnSize);
}
@Override
@@ -1060,7 +1091,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
boolean ascending = node.getScanOrder() == Ordering.ASC;
List<Aggregator> aggregators = new ArrayList<>();
Map<String, List<InputLocation>> layout = makeLayout(node);
- for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) {
+ List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
+ for (AggregationDescriptor descriptor : aggregationDescriptors) {
List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
aggregators.add(
SlidingWindowAggregatorFactory.createSlidingWindowAggregator(
@@ -1074,9 +1106,22 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
descriptor.getStep()));
}
+ GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, false);
+ long maxReturnSize =
+ calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
+
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
return new SlidingWindowAggregationOperator(
- operatorContext, aggregators, child, ascending, node.getGroupByTimeParameter());
+ operatorContext,
+ aggregators,
+ timeRangeIterator,
+ child,
+ ascending,
+ groupByTimeParameter,
+ maxReturnSize);
}
@Override
@@ -1121,6 +1166,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
boolean ascending = node.getScanOrder() == Ordering.ASC;
List<Aggregator> aggregators = new ArrayList<>();
Map<String, List<InputLocation>> layout = makeLayout(node);
+ List<AggregationDescriptor> aggregationDescriptors = node.getAggregationDescriptorList();
for (AggregationDescriptor descriptor : node.getAggregationDescriptorList()) {
List<InputLocation[]> inputLocationList = calcInputLocationList(descriptor, layout);
aggregators.add(
@@ -1136,6 +1182,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
inputLocationList));
}
boolean inputRaw = node.getAggregationDescriptorList().get(0).getStep().isInputRaw();
+ GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+
if (inputRaw) {
checkArgument(children.size() == 1, "rawDataAggregateOperator can only accept one input");
OperatorContext operatorContext =
@@ -1146,8 +1194,20 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
node.getPlanNodeId(),
RawDataAggregationOperator.class.getSimpleName());
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
+
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, true);
+ long maxReturnSize =
+ calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
+
return new RawDataAggregationOperator(
- operatorContext, aggregators, children.get(0), ascending, node.getGroupByTimeParameter());
+ operatorContext,
+ aggregators,
+ timeRangeIterator,
+ children.get(0),
+ ascending,
+ maxReturnSize);
} else {
OperatorContext operatorContext =
context
@@ -1156,9 +1216,16 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
AggregationOperator.class.getSimpleName());
+
+ ITimeRangeIterator timeRangeIterator =
+ initTimeRangeIterator(groupByTimeParameter, ascending, true);
+ long maxReturnSize =
+ calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
+
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
return new AggregationOperator(
- operatorContext, aggregators, children, ascending, node.getGroupByTimeParameter(), true);
+ operatorContext, aggregators, timeRangeIterator, children, maxReturnSize);
}
}
@@ -1400,6 +1467,10 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
// last_time, last_value
List<Aggregator> aggregators = LastQueryUtil.createAggregators(seriesPath.getSeriesType());
+ ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(null, false, false);
+ long maxReturnSize =
+ calculateMaxAggregationResultSizeForLastQuery(
+ aggregators, seriesPath.transformToPartialPath());
SeriesAggregationScanOperator seriesAggregationScanOperator =
new SeriesAggregationScanOperator(
@@ -1408,9 +1479,11 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()),
operatorContext,
aggregators,
+ timeRangeIterator,
context.getLastQueryTimeFilter(),
false,
- null);
+ null,
+ maxReturnSize);
context.addSourceOperator(seriesAggregationScanOperator);
context.addPath(seriesPath);
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
@@ -1479,15 +1552,22 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
// last_time, last_value
List<Aggregator> aggregators =
LastQueryUtil.createAggregators(seriesPath.getSchemaList().get(0).getType());
+ ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(null, false, false);
+ long maxReturnSize =
+ calculateMaxAggregationResultSizeForLastQuery(
+ aggregators, seriesPath.transformToPartialPath());
+
AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
new AlignedSeriesAggregationScanOperator(
node.getPlanNodeId(),
seriesPath,
operatorContext,
aggregators,
+ timeRangeIterator,
context.getLastQueryTimeFilter(),
false,
- null);
+ null,
+ maxReturnSize);
context.addSourceOperator(seriesAggregationScanOperator);
context.addPath(seriesPath);
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
index d0eff60394..44d5fc1c66 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
@@ -17,27 +17,30 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.memory;
+package org.apache.iotdb.db.mpp.statistics;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.commons.path.PartialPath;
-/**
- * Manages memory of a data node. The memory is divided into two memory pools so that the memory for
- * read and for write can be isolated.
- */
-public class LocalMemoryManager {
+import com.google.common.collect.Maps;
+
+import java.util.Map;
- private final MemoryPool queryPool;
+public class StatisticsManager {
- public LocalMemoryManager() {
- queryPool =
- new MemoryPool(
- "query",
- IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead(),
- (long) (IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead() * 0.5));
+ private final Map<PartialPath, TimeseriesStats> seriesToStatsMap = Maps.newConcurrentMap();
+
+ public long getMaxBinarySizeInBytes(PartialPath path) {
+ return 512 * Byte.BYTES;
}
- public MemoryPool getQueryPool() {
- return queryPool;
+ public static StatisticsManager getInstance() {
+ return StatisticsManager.StatisticsManagerHelper.INSTANCE;
+ }
+
+ private static class StatisticsManagerHelper {
+
+ private static final StatisticsManager INSTANCE = new StatisticsManager();
+
+ private StatisticsManagerHelper() {}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
index d0eff60394..509341d3d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/LocalMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
@@ -17,27 +17,8 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.memory;
+package org.apache.iotdb.db.mpp.statistics;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
-/**
- * Manages memory of a data node. The memory is divided into two memory pools so that the memory for
- * read and for write can be isolated.
- */
-public class LocalMemoryManager {
-
- private final MemoryPool queryPool;
-
- public LocalMemoryManager() {
- queryPool =
- new MemoryPool(
- "query",
- IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead(),
- (long) (IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForRead() * 0.5));
- }
-
- public MemoryPool getQueryPool() {
- return queryPool;
- }
+public class TimeseriesStats {
+ // TODO collect time series statistics
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java
index 87cecc817a..d9693aa671 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/multi/MappableUDFColumnTransformer.java
@@ -88,4 +88,8 @@ public class MappableUDFColumnTransformer extends ColumnTransformer {
protected void checkType() {
// do nothing
}
+
+ public ColumnTransformer[] getInputColumnTransformers() {
+ return inputColumnTransformers;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java
index 2032215b0f..096cef4d5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/ternary/TernaryColumnTransformer.java
@@ -41,4 +41,16 @@ public abstract class TernaryColumnTransformer extends ColumnTransformer {
this.thirdColumnTransformer = thirdColumnTransformer;
checkType();
}
+
+ public ColumnTransformer getFirstColumnTransformer() {
+ return firstColumnTransformer;
+ }
+
+ public ColumnTransformer getSecondColumnTransformer() {
+ return secondColumnTransformer;
+ }
+
+ public ColumnTransformer getThirdColumnTransformer() {
+ return thirdColumnTransformer;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java
index d84b538cc6..c0682169b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/column/unary/UnaryColumnTransformer.java
@@ -42,6 +42,10 @@ public abstract class UnaryColumnTransformer extends ColumnTransformer {
initializeColumnCache(columnBuilder.build());
}
+ public ColumnTransformer getChildColumnTransformer() {
+ return childColumnTransformer;
+ }
+
@Override
protected void checkType() {
// do nothing
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java
index ae6ab41567..b20e84216e 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java
@@ -35,8 +35,7 @@ public class TsFileResourceManager {
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
/** threshold total memory for all TimeIndex */
- private double TIME_INDEX_MEMORY_THRESHOLD =
- CONFIG.getAllocateMemoryForRead() * CONFIG.getTimeIndexMemoryProportion();
+ private double TIME_INDEX_MEMORY_THRESHOLD = CONFIG.getAllocateMemoryForTimeIndex();
/** store the sealed TsFileResource, sorted by priority of TimeIndex */
private final TreeSet<TsFileResource> sealedTsFileResources =
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
index 6233fa04ce..e25f592aa8 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TimeSelector.java
@@ -166,6 +166,11 @@ public class TimeSelector {
return smallerChildIndex;
}
+ public void clear() {
+ heapSize = 0;
+ lastTime = Long.MIN_VALUE;
+ }
+
@Override
public String toString() {
return Arrays.toString(this.timeHeap);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
index 88e4b6bf16..1d3f3ccf79 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/TimeRangeIteratorTest.java
@@ -295,6 +295,8 @@ public class TimeRangeIteratorTest {
}
private void checkRes(ITimeRangeIterator timeRangeIterator, String[] res) {
+ Assert.assertEquals(res.length, timeRangeIterator.getTotalIntervalNum());
+
boolean isAscending = timeRangeIterator.isAscending();
int cnt = isAscending ? 0 : res.length - 1;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
index 233965aa3e..85e65849a6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
@@ -61,6 +61,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
public class AggregationOperatorTest {
@@ -319,9 +321,11 @@ public class AggregationOperatorTest {
Collections.singleton("sensor0"),
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, true, true),
null,
true,
- groupByTimeParameter);
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
List<TsFileResource> seqResources1 = new ArrayList<>();
List<TsFileResource> unSeqResources1 = new ArrayList<>();
seqResources1.add(seqResources.get(0));
@@ -341,9 +345,11 @@ public class AggregationOperatorTest {
Collections.singleton("sensor0"),
fragmentInstanceContext.getOperatorContexts().get(1),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, true, true),
null,
true,
- groupByTimeParameter);
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
List<TsFileResource> seqResources2 = new ArrayList<>();
List<TsFileResource> unSeqResources2 = new ArrayList<>();
seqResources2.add(seqResources.get(2));
@@ -368,9 +374,8 @@ public class AggregationOperatorTest {
return new AggregationOperator(
fragmentInstanceContext.getOperatorContexts().get(2),
finalAggregators,
+ initTimeRangeIterator(groupByTimeParameter, true, true),
children,
- true,
- groupByTimeParameter,
- true);
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
index 2277299102..a2e989f47e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
@@ -63,6 +63,8 @@ import java.util.stream.Collectors;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -621,10 +623,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
1, planNodeId, SeriesScanOperator.class.getSimpleName());
fragmentInstanceContext
.getOperatorContexts()
- .forEach(
- operatorContext -> {
- operatorContext.setMaxRunTime(TEST_TIME_SLICE);
- });
+ .forEach(operatorContext -> operatorContext.setMaxRunTime(TEST_TIME_SLICE));
AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
new AlignedSeriesAggregationScanOperator(
@@ -632,9 +631,11 @@ public class AlignedSeriesAggregationScanOperatorTest {
alignedPath,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, ascending, true),
timeFilter,
ascending,
- groupByTimeParameter);
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
return seriesAggregationScanOperator;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java
index 5cbe7b189f..432967bb66 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java
@@ -129,6 +129,21 @@ public class FillOperatorTest {
public boolean isFinished() {
return index >= 3;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
});
int count = 0;
@@ -283,6 +298,21 @@ public class FillOperatorTest {
public boolean isFinished() {
return index >= 3;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
});
int count = 0;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
index e44f6ce2b9..38d244a13f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
@@ -126,6 +126,21 @@ public class LastQueryMergeOperatorTest {
public boolean isFinished() {
return !hasNext();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
};
Operator operator2 =
@@ -175,6 +190,21 @@ public class LastQueryMergeOperatorTest {
public boolean isFinished() {
return !hasNext();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
};
LastQueryMergeOperator lastQueryMergeOperator =
@@ -292,6 +322,21 @@ public class LastQueryMergeOperatorTest {
public boolean isFinished() {
return !hasNext();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
};
Operator operator2 =
@@ -341,6 +386,21 @@ public class LastQueryMergeOperatorTest {
public boolean isFinished() {
return !hasNext();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
};
LastQueryMergeOperator lastQueryMergeOperator =
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
index d914ec3ee7..4864fed39b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
@@ -56,6 +56,8 @@ import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -133,9 +135,11 @@ public class LastQueryOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators1,
+ initTimeRangeIterator(null, false, true),
null,
false,
- null);
+ null,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator1.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
@@ -155,9 +159,11 @@ public class LastQueryOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(2),
aggregators2,
+ initTimeRangeIterator(null, false, true),
null,
false,
- null);
+ null,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator2.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
@@ -252,9 +258,11 @@ public class LastQueryOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators1,
+ initTimeRangeIterator(null, false, true),
null,
false,
- null);
+ null,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator1.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
@@ -274,9 +282,11 @@ public class LastQueryOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(2),
aggregators2,
+ initTimeRangeIterator(null, false, true),
null,
false,
- null);
+ null,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator2.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
index 2785f1d803..6dc84c071b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
@@ -58,6 +58,8 @@ import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -135,9 +137,11 @@ public class LastQuerySortOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators1,
+ initTimeRangeIterator(null, false, true),
null,
false,
- null);
+ null,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator1.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
@@ -157,9 +161,11 @@ public class LastQuerySortOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(2),
aggregators2,
+ initTimeRangeIterator(null, false, true),
null,
false,
- null);
+ null,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator2.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
@@ -255,9 +261,11 @@ public class LastQuerySortOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators1,
+ initTimeRangeIterator(null, false, true),
null,
false,
- null);
+ null,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator1.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
@@ -277,9 +285,11 @@ public class LastQuerySortOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(2),
aggregators2,
+ initTimeRangeIterator(null, false, true),
null,
false,
- null);
+ null,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator2.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
index 56c2fe6a1a..733694d4d5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
@@ -162,6 +162,21 @@ public class LinearFillOperatorTest {
public boolean isFinished() {
return index >= 3;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
});
int count = 0;
@@ -360,6 +375,21 @@ public class LinearFillOperatorTest {
public boolean isFinished() {
return index >= 3;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
});
float[][][] res =
@@ -558,6 +588,21 @@ public class LinearFillOperatorTest {
public boolean isFinished() {
return index >= 3;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
});
int count = 0;
@@ -756,6 +801,21 @@ public class LinearFillOperatorTest {
public boolean isFinished() {
return index >= 3;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
});
int count = 0;
@@ -902,6 +962,21 @@ public class LinearFillOperatorTest {
public boolean isFinished() {
return index >= 7;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
});
int count = 0;
@@ -1007,6 +1082,21 @@ public class LinearFillOperatorTest {
public boolean isFinished() {
return index >= 7;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
});
int count = 0;
@@ -1112,6 +1202,21 @@ public class LinearFillOperatorTest {
public boolean isFinished() {
return index >= 7;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
});
int count = 0;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
new file mode 100644
index 0000000000..247eb0fc32
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -0,0 +1,1542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.SortOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryCollectOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQuerySortOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.CountMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesCountOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesSchemaScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.LevelTimeSeriesCountOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.NodeManageMemoryMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsConvertOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsCountOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsSchemaScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.PathsUsingTemplateScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaFetchMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaFetchScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryOrderByHeatOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesCountOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesSchemaScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticAdditionColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareLessEqualColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
+import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.type.BooleanType;
+import org.apache.iotdb.tsfile.read.common.type.LongType;
+import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator.MAP_NODE_RETRAINED_SIZE;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class OperatorMemoryTest {
+
+ @Test
+ public void seriesScanOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ MeasurementPath measurementPath =
+ new MeasurementPath("root.SeriesScanOperatorTest.device0.sensor0", TSDataType.INT32);
+ Set<String> allSensors = Sets.newHashSet("sensor0");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+ SeriesScanOperator seriesScanOperator =
+ new SeriesScanOperator(
+ planNodeId,
+ measurementPath,
+ allSensors,
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ true);
+
+ assertEquals(
+ TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+ seriesScanOperator.calculateMaxPeekMemory());
+ assertEquals(
+ TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+ seriesScanOperator.calculateMaxReturnSize());
+ assertEquals(0, seriesScanOperator.calculateRetainedSizeAfterCallingNext());
+
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void alignedSeriesScanOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ AlignedPath alignedPath =
+ new AlignedPath(
+ "root.AlignedSeriesScanOperatorTest.device0",
+ Arrays.asList("sensor0", "sensor1", "sensor2"));
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, AlignedSeriesScanOperator.class.getSimpleName());
+
+ AlignedSeriesScanOperator seriesScanOperator =
+ new AlignedSeriesScanOperator(
+ planNodeId,
+ alignedPath,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ true);
+
+ assertEquals(
+ 4 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+ seriesScanOperator.calculateMaxPeekMemory());
+ assertEquals(
+ 4 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+ seriesScanOperator.calculateMaxReturnSize());
+
+ assertEquals(0, seriesScanOperator.calculateRetainedSizeAfterCallingNext());
+
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void exchangeOperatorTest() {
+ ExchangeOperator exchangeOperator = new ExchangeOperator(null, null, null);
+
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, exchangeOperator.calculateMaxPeekMemory());
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, exchangeOperator.calculateMaxReturnSize());
+ assertEquals(0, exchangeOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void lastCacheScanOperatorTest() {
+ TsBlock tsBlock = Mockito.mock(TsBlock.class);
+ Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(1024L);
+ LastCacheScanOperator lastCacheScanOperator = new LastCacheScanOperator(null, null, tsBlock);
+
+ assertEquals(1024, lastCacheScanOperator.calculateMaxPeekMemory());
+ assertEquals(1024, lastCacheScanOperator.calculateMaxReturnSize());
+ assertEquals(0, lastCacheScanOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void fillOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+ FillOperator fillOperator =
+ new FillOperator(Mockito.mock(OperatorContext.class), new IFill[] {null, null}, child);
+
+ assertEquals(2048 * 2 + 512, fillOperator.calculateMaxPeekMemory());
+ assertEquals(1024, fillOperator.calculateMaxReturnSize());
+ assertEquals(512, fillOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void lastQueryCollectOperatorTest() {
+ List<Operator> children = new ArrayList<>(4);
+ Random random = new Random();
+ long expectedMaxPeekMemory = 0;
+ long expectedMaxReturnSize = 0;
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ long currentMaxPeekMemory = random.nextInt(1024) + 1024;
+ long currentMaxReturnSize = random.nextInt(1024);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(currentMaxPeekMemory);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(currentMaxReturnSize);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+ children.add(child);
+ expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, currentMaxPeekMemory);
+ expectedMaxReturnSize = Math.max(expectedMaxReturnSize, currentMaxReturnSize);
+ }
+ LastQueryCollectOperator lastQueryCollectOperator =
+ new LastQueryCollectOperator(Mockito.mock(OperatorContext.class), children);
+
+ assertEquals(expectedMaxPeekMemory, lastQueryCollectOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, lastQueryCollectOperator.calculateMaxReturnSize());
+ assertEquals(4 * 512, lastQueryCollectOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void lastQueryMergeOperatorTest() {
+ List<Operator> children = new ArrayList<>(4);
+ Random random = new Random();
+ long expectedMaxPeekMemory = 0;
+ long temp = 0;
+ long expectedMaxReturnSize = 0;
+ long childSumReturnSize = 0;
+ long minReturnSize = Long.MAX_VALUE;
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ long currentMaxPeekMemory = random.nextInt(1024) + 1024;
+ long currentMaxReturnSize = random.nextInt(1024);
+ minReturnSize = Math.min(minReturnSize, currentMaxReturnSize);
+ childSumReturnSize += currentMaxReturnSize;
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(currentMaxPeekMemory);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(currentMaxReturnSize);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+ children.add(child);
+ expectedMaxReturnSize = Math.max(expectedMaxReturnSize, currentMaxReturnSize);
+ expectedMaxPeekMemory =
+ Math.max(expectedMaxPeekMemory, temp + child.calculateMaxPeekMemory());
+ temp += (child.calculateMaxReturnSize() + child.calculateRetainedSizeAfterCallingNext());
+ }
+ // we need to cache all the TsBlocks of children and then return a new TsBlock as result whose
+ // max possible should be equal to max return size among all its children and then we should
+ // also take TreeMap memory into account.
+ expectedMaxPeekMemory =
+ Math.max(
+ expectedMaxPeekMemory,
+ temp
+ + expectedMaxReturnSize
+ + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+ * MAP_NODE_RETRAINED_SIZE);
+
+ LastQueryMergeOperator lastQueryMergeOperator =
+ new LastQueryMergeOperator(
+ Mockito.mock(OperatorContext.class), children, Comparator.naturalOrder());
+
+ assertEquals(expectedMaxPeekMemory, lastQueryMergeOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, lastQueryMergeOperator.calculateMaxReturnSize());
+ assertEquals(
+ childSumReturnSize
+ - minReturnSize
+ + 4 * 512
+ + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()
+ * MAP_NODE_RETRAINED_SIZE,
+ lastQueryMergeOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void lastQueryOperatorTest() {
+ TsBlockBuilder builder = Mockito.mock(TsBlockBuilder.class);
+ Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(1024L);
+ List<UpdateLastCacheOperator> children = new ArrayList<>(4);
+ long expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ for (int i = 0; i < 4; i++) {
+ UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+ children.add(child);
+ expectedMaxReturnSize = Math.max(expectedMaxReturnSize, 1024L);
+ }
+ LastQueryOperator lastQueryOperator =
+ new LastQueryOperator(Mockito.mock(OperatorContext.class), children, builder);
+
+ assertEquals(
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + 2 * 1024 * 1024L,
+ lastQueryOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, lastQueryOperator.calculateMaxReturnSize());
+ assertEquals(4 * 512L, lastQueryOperator.calculateRetainedSizeAfterCallingNext());
+
+ Mockito.when(builder.getRetainedSizeInBytes()).thenReturn(4 * 1024 * 1024L);
+ assertEquals(4 * 1024 * 1024L + 2 * 1024 * 1024L, lastQueryOperator.calculateMaxPeekMemory());
+ assertEquals(4 * 1024 * 1024L, lastQueryOperator.calculateMaxReturnSize());
+ assertEquals(4 * 512L, lastQueryOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void lastQuerySortOperatorTest() {
+ TsBlock tsBlock = Mockito.mock(TsBlock.class);
+ Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(16 * 1024L);
+ Mockito.when(tsBlock.getPositionCount()).thenReturn(16);
+ List<UpdateLastCacheOperator> children = new ArrayList<>(4);
+
+ for (int i = 0; i < 4; i++) {
+ UpdateLastCacheOperator child = Mockito.mock(UpdateLastCacheOperator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+ children.add(child);
+ }
+
+ LastQuerySortOperator lastQuerySortOperator =
+ new LastQuerySortOperator(
+ Mockito.mock(OperatorContext.class), tsBlock, children, Comparator.naturalOrder());
+
+ assertEquals(
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + tsBlock.getRetainedSizeInBytes() + 2 * 1024L,
+ lastQuerySortOperator.calculateMaxPeekMemory());
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, lastQuerySortOperator.calculateMaxReturnSize());
+ assertEquals(
+ 16 * 1024L + 1024L + 4 * 512L,
+ lastQuerySortOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void limitOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+ LimitOperator limitOperator =
+ new LimitOperator(Mockito.mock(OperatorContext.class), 100, child);
+
+ assertEquals(2 * 1024L, limitOperator.calculateMaxPeekMemory());
+ assertEquals(1024, limitOperator.calculateMaxReturnSize());
+ assertEquals(512, limitOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void offsetOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+ OffsetOperator offsetOperator =
+ new OffsetOperator(Mockito.mock(OperatorContext.class), 100, child);
+
+ assertEquals(2 * 1024L, offsetOperator.calculateMaxPeekMemory());
+ assertEquals(1024, offsetOperator.calculateMaxReturnSize());
+ assertEquals(512, offsetOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void rowBasedTimeJoinOperatorTest() {
+ List<Operator> children = new ArrayList<>(4);
+ List<TSDataType> dataTypeList = new ArrayList<>(2);
+ dataTypeList.add(TSDataType.INT32);
+ dataTypeList.add(TSDataType.INT32);
+ long expectedMaxReturnSize =
+ 3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ long expectedMaxPeekMemory = 0;
+ long childrenMaxPeekMemory = 0;
+
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+ childrenMaxPeekMemory =
+ Math.max(childrenMaxPeekMemory, expectedMaxPeekMemory + child.calculateMaxPeekMemory());
+ expectedMaxPeekMemory += 64 * 1024L;
+ children.add(child);
+ }
+
+ expectedMaxPeekMemory =
+ Math.max(expectedMaxPeekMemory + expectedMaxReturnSize, childrenMaxPeekMemory);
+
+ RowBasedTimeJoinOperator rowBasedTimeJoinOperator =
+ new RowBasedTimeJoinOperator(
+ Mockito.mock(OperatorContext.class), children, Ordering.ASC, dataTypeList, null, null);
+
+ assertEquals(expectedMaxPeekMemory, rowBasedTimeJoinOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, rowBasedTimeJoinOperator.calculateMaxReturnSize());
+ assertEquals(3 * 64 * 1024L, rowBasedTimeJoinOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void sortOperatorTest() {
+ SortOperator sortOperator = new SortOperator();
+ assertEquals(0, sortOperator.calculateMaxPeekMemory());
+ assertEquals(0, sortOperator.calculateMaxReturnSize());
+ assertEquals(0, sortOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void timeJoinOperatorTest() {
+ List<Operator> children = new ArrayList<>(4);
+ List<TSDataType> dataTypeList = new ArrayList<>(2);
+ dataTypeList.add(TSDataType.INT32);
+ dataTypeList.add(TSDataType.INT32);
+ long expectedMaxReturnSize =
+ 3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ long expectedMaxPeekMemory = 0;
+ long childrenMaxPeekMemory = 0;
+
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+ childrenMaxPeekMemory =
+ Math.max(childrenMaxPeekMemory, expectedMaxPeekMemory + child.calculateMaxPeekMemory());
+ expectedMaxPeekMemory += 64 * 1024L;
+ children.add(child);
+ }
+
+ expectedMaxPeekMemory =
+ Math.max(expectedMaxPeekMemory + expectedMaxReturnSize, childrenMaxPeekMemory);
+
+ TimeJoinOperator timeJoinOperator =
+ new TimeJoinOperator(
+ Mockito.mock(OperatorContext.class), children, Ordering.ASC, dataTypeList, null, null);
+
+ assertEquals(expectedMaxPeekMemory, timeJoinOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, timeJoinOperator.calculateMaxReturnSize());
+ assertEquals(3 * 64 * 1024L, timeJoinOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void updateLastCacheOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+ UpdateLastCacheOperator updateLastCacheOperator =
+ new UpdateLastCacheOperator(null, child, null, TSDataType.BOOLEAN, null, true);
+
+ assertEquals(2048, updateLastCacheOperator.calculateMaxPeekMemory());
+ assertEquals(1024, updateLastCacheOperator.calculateMaxReturnSize());
+ assertEquals(512, updateLastCacheOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void linearFillOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+ LinearFillOperator linearFillOperator =
+ new LinearFillOperator(
+ Mockito.mock(OperatorContext.class), new LinearFill[] {null, null}, child);
+
+ assertEquals(2048 * 3 + 512L, linearFillOperator.calculateMaxPeekMemory());
+ assertEquals(1024, linearFillOperator.calculateMaxReturnSize());
+ assertEquals(512, linearFillOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void deviceMergeOperatorTest() {
+ List<Operator> children = new ArrayList<>(4);
+ List<TSDataType> dataTypeList = new ArrayList<>(2);
+ dataTypeList.add(TSDataType.INT32);
+ dataTypeList.add(TSDataType.INT32);
+ List<String> devices = new ArrayList<>(4);
+ devices.add("device1");
+ devices.add("device2");
+ devices.add("device3");
+ devices.add("device4");
+ long expectedMaxReturnSize =
+ 3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ long expectedMaxPeekMemory = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ long expectedRetainedSizeAfterCallingNext = 0;
+ long childrenMaxPeekMemory = 0;
+
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(64 * 1024L);
+ expectedMaxPeekMemory += 128 * 1024L;
+ childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ expectedRetainedSizeAfterCallingNext += 128 * 1024L;
+ children.add(child);
+ }
+
+ expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+
+ DeviceMergeOperator deviceMergeOperator =
+ new DeviceMergeOperator(
+ Mockito.mock(OperatorContext.class),
+ devices,
+ children,
+ dataTypeList,
+ Mockito.mock(TimeSelector.class),
+ Mockito.mock(TimeComparator.class));
+
+ assertEquals(expectedMaxPeekMemory, deviceMergeOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, deviceMergeOperator.calculateMaxReturnSize());
+ assertEquals(
+ expectedRetainedSizeAfterCallingNext - 64 * 1024L,
+ deviceMergeOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void deviceViewOperatorTest() {
+ List<Operator> children = new ArrayList<>(4);
+ List<TSDataType> dataTypeList = new ArrayList<>(2);
+ dataTypeList.add(TSDataType.INT32);
+ dataTypeList.add(TSDataType.INT32);
+ List<String> devices = new ArrayList<>(4);
+ devices.add("device1");
+ devices.add("device2");
+ devices.add("device3");
+ devices.add("device4");
+ long expectedMaxReturnSize =
+ 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ long expectedMaxPeekMemory = expectedMaxReturnSize;
+ long expectedRetainedSizeAfterCallingNext = 0;
+ long childrenMaxPeekMemory = 0;
+
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(1024L);
+ expectedMaxPeekMemory += 1024L;
+ childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ expectedRetainedSizeAfterCallingNext += 1024L;
+ children.add(child);
+ }
+
+ expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory);
+
+ DeviceViewOperator deviceViewOperator =
+ new DeviceViewOperator(
+ Mockito.mock(OperatorContext.class),
+ devices,
+ children,
+ new ArrayList<>(),
+ dataTypeList);
+
+ assertEquals(expectedMaxPeekMemory, deviceViewOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, deviceViewOperator.calculateMaxReturnSize());
+ assertEquals(
+ expectedRetainedSizeAfterCallingNext,
+ deviceViewOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void filterAndProjectOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+ BooleanType booleanType = Mockito.mock(BooleanType.class);
+ Mockito.when(booleanType.getTypeEnum()).thenReturn(TypeEnum.BOOLEAN);
+ LongType longType = Mockito.mock(LongType.class);
+ Mockito.when(longType.getTypeEnum()).thenReturn(TypeEnum.INT64);
+ ColumnTransformer filterColumnTransformer =
+ new CompareLessEqualColumnTransformer(
+ booleanType,
+ new TimeColumnTransformer(longType),
+ new ConstantColumnTransformer(longType, Mockito.mock(IntColumn.class)));
+ List<TSDataType> filterOutputTypes = new ArrayList<>();
+ filterOutputTypes.add(TSDataType.INT32);
+ filterOutputTypes.add(TSDataType.INT64);
+ List<ColumnTransformer> projectColumnTransformers = new ArrayList<>();
+ projectColumnTransformers.add(
+ new ArithmeticAdditionColumnTransformer(
+ booleanType,
+ new TimeColumnTransformer(longType),
+ new ConstantColumnTransformer(longType, Mockito.mock(IntColumn.class))));
+
+ FilterAndProjectOperator operator =
+ new FilterAndProjectOperator(
+ Mockito.mock(OperatorContext.class),
+ child,
+ filterOutputTypes,
+ new ArrayList<>(),
+ filterColumnTransformer,
+ new ArrayList<>(),
+ new ArrayList<>(),
+ projectColumnTransformers,
+ false,
+ true);
+
+ assertEquals(
+ 4L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() + 512L,
+ operator.calculateMaxPeekMemory());
+ assertEquals(
+ 2 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
+ operator.calculateMaxReturnSize());
+ assertEquals(512, operator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void TimeSeriesSchemaScanOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+ TimeSeriesSchemaScanOperator operator =
+ new TimeSeriesSchemaScanOperator(
+ planNodeId,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ 0,
+ 0,
+ null,
+ null,
+ null,
+ false,
+ false,
+ false,
+ null);
+
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+ assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void DeviceSchemaScanOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+ DevicesSchemaScanOperator operator =
+ new DevicesSchemaScanOperator(
+ planNodeId,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ 0,
+ 0,
+ null,
+ false,
+ false);
+
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+ assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void PathsUsingTemplateScanOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+ PathsUsingTemplateScanOperator operator =
+ new PathsUsingTemplateScanOperator(
+ planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), 0);
+
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+ assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void TimeSeriesCountOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+ TimeSeriesCountOperator operator =
+ new TimeSeriesCountOperator(
+ planNodeId,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ false,
+ null,
+ null,
+ false);
+
+ assertEquals(4L, operator.calculateMaxPeekMemory());
+ assertEquals(4L, operator.calculateMaxReturnSize());
+ assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void LevelTimeSeriesCountOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+ LevelTimeSeriesCountOperator operator =
+ new LevelTimeSeriesCountOperator(
+ planNodeId,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ false,
+ 4,
+ null,
+ null,
+ false);
+
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+ assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void DevicesCountOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+ DevicesCountOperator operator =
+ new DevicesCountOperator(
+ planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), null, false);
+
+ assertEquals(4L, operator.calculateMaxPeekMemory());
+ assertEquals(4L, operator.calculateMaxReturnSize());
+ assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void SchemaQueryMergeOperatorTest() {
+ QueryId queryId = new QueryId("stub_query");
+ List<Operator> children = new ArrayList<>(4);
+
+ long expectedMaxReturnSize = 0;
+ long expectedMaxPeekMemory = 0;
+ long expectedRetainedSize = 0;
+
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+ expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, child.calculateMaxPeekMemory());
+ expectedMaxReturnSize = Math.max(expectedMaxReturnSize, child.calculateMaxReturnSize());
+ expectedRetainedSize += child.calculateRetainedSizeAfterCallingNext();
+ children.add(child);
+ }
+
+ SchemaQueryMergeOperator operator =
+ new SchemaQueryMergeOperator(
+ queryId.genPlanNodeId(), Mockito.mock(OperatorContext.class), children);
+
+ assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+ assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void CountMergeOperatorTest() {
+ QueryId queryId = new QueryId("stub_query");
+ List<Operator> children = new ArrayList<>(4);
+
+ long expectedMaxReturnSize = 0;
+ long expectedMaxPeekMemory = 0;
+ long expectedRetainedSize = 0;
+
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+ expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, child.calculateMaxPeekMemory());
+ expectedMaxReturnSize = Math.max(expectedMaxReturnSize, child.calculateMaxReturnSize());
+ expectedRetainedSize += child.calculateRetainedSizeAfterCallingNext();
+ children.add(child);
+ }
+
+ CountMergeOperator operator =
+ new CountMergeOperator(
+ queryId.genPlanNodeId(), Mockito.mock(OperatorContext.class), children);
+
+ assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+ assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void SchemaFetchScanOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+ SchemaFetchScanOperator operator =
+ new SchemaFetchScanOperator(
+ planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), null, null, null);
+
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+ assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void SchemaFetchMergeOperatorTest() {
+ List<Operator> children = new ArrayList<>(4);
+
+ long expectedMaxReturnSize = 0;
+ long expectedMaxPeekMemory = 0;
+ long expectedRetainedSize = 0;
+
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+ expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, child.calculateMaxPeekMemory());
+ expectedMaxReturnSize = Math.max(expectedMaxReturnSize, child.calculateMaxReturnSize());
+ expectedRetainedSize += child.calculateRetainedSizeAfterCallingNext();
+ children.add(child);
+ }
+
+ SchemaFetchMergeOperator operator =
+ new SchemaFetchMergeOperator(Mockito.mock(OperatorContext.class), children, null);
+
+ assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+ assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void SchemaQueryOrderByHeatOperatorTest() {
+ List<Operator> children = new ArrayList<>(4);
+
+ long expectedMaxReturnSize = 0;
+ long expectedMaxPeekMemory = 0;
+ long expectedRetainedSize = 0;
+
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+ expectedMaxPeekMemory += child.calculateMaxReturnSize();
+ expectedMaxReturnSize += child.calculateMaxReturnSize();
+ expectedRetainedSize +=
+ child.calculateRetainedSizeAfterCallingNext() + child.calculateMaxReturnSize();
+ children.add(child);
+ }
+
+ SchemaQueryOrderByHeatOperator operator =
+ new SchemaQueryOrderByHeatOperator(Mockito.mock(OperatorContext.class), children);
+
+ assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+ assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void NodePathsSchemaScanOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+ NodePathsSchemaScanOperator operator =
+ new NodePathsSchemaScanOperator(
+ planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), null, 4);
+
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory());
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize());
+ assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void NodePathsConvertOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+
+ long expectedMaxPeekMemory = child.calculateMaxPeekMemory() + child.calculateMaxReturnSize();
+ long expectedMaxReturnSize = child.calculateMaxReturnSize();
+ long expectedRetainedSize = child.calculateRetainedSizeAfterCallingNext();
+
+ NodePathsConvertOperator operator =
+ new NodePathsConvertOperator(Mockito.mock(OperatorContext.class), child);
+
+ assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+ assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void NodePathsCountOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+
+ long expectedMaxPeekMemory =
+ Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxPeekMemory());
+ long expectedMaxReturnSize =
+ Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize());
+ long expectedRetainedSize =
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + child.calculateRetainedSizeAfterCallingNext();
+
+ NodePathsCountOperator operator =
+ new NodePathsCountOperator(Mockito.mock(OperatorContext.class), child);
+
+ assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+ assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void NodeManageMemoryMergeOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+
+ long expectedMaxPeekMemory =
+ Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxPeekMemory());
+ long expectedMaxReturnSize =
+ Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize());
+ long expectedRetainedSize =
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + child.calculateRetainedSizeAfterCallingNext();
+
+ NodeManageMemoryMergeOperator operator =
+ new NodeManageMemoryMergeOperator(
+ Mockito.mock(OperatorContext.class), Collections.emptySet(), child);
+
+ assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+ assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void seriesAggregationScanOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1", TSDataType.TEXT);
+ TypeProvider typeProvider = new TypeProvider();
+ typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+ typeProvider.setType("min_time(root.sg.d1.s1)", TSDataType.INT64);
+ typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+ // case1: without group by, step is SINGLE
+ List<AggregationDescriptor> aggregationDescriptors1 =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))));
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator1 =
+ createSeriesAggregationScanOperator(
+ instanceNotificationExecutor,
+ measurementPath,
+ aggregationDescriptors1,
+ null,
+ typeProvider);
+
+ long expectedMaxReturnSize =
+ TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + 512 * Byte.BYTES
+ + LongColumn.SIZE_IN_BYTES_PER_POSITION;
+ long expectedMaxRetainSize =
+ 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize,
+ seriesAggregationScanOperator1.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, seriesAggregationScanOperator1.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize,
+ seriesAggregationScanOperator1.calculateRetainedSizeAfterCallingNext());
+
+ // case2: without group by, step is PARTIAL
+ List<AggregationDescriptor> aggregationDescriptors2 =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.PARTIAL,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.PARTIAL,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))));
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator2 =
+ createSeriesAggregationScanOperator(
+ instanceNotificationExecutor,
+ measurementPath,
+ aggregationDescriptors2,
+ null,
+ typeProvider);
+
+ expectedMaxReturnSize =
+ TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + 512 * Byte.BYTES
+ + 2 * LongColumn.SIZE_IN_BYTES_PER_POSITION;
+ expectedMaxRetainSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize,
+ seriesAggregationScanOperator2.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, seriesAggregationScanOperator2.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize,
+ seriesAggregationScanOperator2.calculateRetainedSizeAfterCallingNext());
+
+ long maxTsBlockLineNumber =
+ TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+
+ // case3: with group by, total window num < 1000
+ GroupByTimeParameter groupByTimeParameter =
+ new GroupByTimeParameter(
+ 0,
+ 2 * maxTsBlockLineNumber,
+ maxTsBlockLineNumber / 100,
+ maxTsBlockLineNumber / 100,
+ true);
+ List<AggregationDescriptor> aggregationDescriptors3 =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))));
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator3 =
+ createSeriesAggregationScanOperator(
+ instanceNotificationExecutor,
+ measurementPath,
+ aggregationDescriptors3,
+ groupByTimeParameter,
+ typeProvider);
+
+ expectedMaxReturnSize =
+ 200
+ * (TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + 512 * Byte.BYTES
+ + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+ expectedMaxRetainSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize,
+ seriesAggregationScanOperator3.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, seriesAggregationScanOperator3.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize,
+ seriesAggregationScanOperator3.calculateRetainedSizeAfterCallingNext());
+
+ // case4: with group by, total window num > 1000
+ groupByTimeParameter = new GroupByTimeParameter(0, 2 * maxTsBlockLineNumber, 1, 1, true);
+ List<AggregationDescriptor> aggregationDescriptors4 =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))));
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator4 =
+ createSeriesAggregationScanOperator(
+ instanceNotificationExecutor,
+ measurementPath,
+ aggregationDescriptors4,
+ groupByTimeParameter,
+ typeProvider);
+
+ expectedMaxReturnSize =
+ maxTsBlockLineNumber
+ * (TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + 512 * Byte.BYTES
+ + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+ expectedMaxRetainSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize,
+ seriesAggregationScanOperator4.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, seriesAggregationScanOperator4.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize,
+ seriesAggregationScanOperator4.calculateRetainedSizeAfterCallingNext());
+
+ // case5: over DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES
+ groupByTimeParameter = new GroupByTimeParameter(0, 2 * maxTsBlockLineNumber, 1, 1, true);
+ List<AggregationDescriptor> aggregationDescriptors5 =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))));
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator5 =
+ createSeriesAggregationScanOperator(
+ instanceNotificationExecutor,
+ measurementPath,
+ aggregationDescriptors5,
+ groupByTimeParameter,
+ typeProvider);
+
+ expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ expectedMaxRetainSize = 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize,
+ seriesAggregationScanOperator5.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, seriesAggregationScanOperator5.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize,
+ seriesAggregationScanOperator5.calculateRetainedSizeAfterCallingNext());
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ private SeriesAggregationScanOperator createSeriesAggregationScanOperator(
+ ExecutorService instanceNotificationExecutor,
+ MeasurementPath measurementPath,
+ List<AggregationDescriptor> aggregationDescriptors,
+ GroupByTimeParameter groupByTimeParameter,
+ TypeProvider typeProvider)
+ throws IllegalPathException {
+ Set<String> allSensors = Sets.newHashSet("s1");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+ List<Aggregator> aggregators = new ArrayList<>();
+ aggregationDescriptors.forEach(
+ o ->
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ o.getAggregationType(), measurementPath.getSeriesType(), true),
+ o.getStep())));
+
+ ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, true, true);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, typeProvider);
+
+ return new SeriesAggregationScanOperator(
+ planNodeId,
+ measurementPath,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ aggregators,
+ timeRangeIterator,
+ null,
+ true,
+ groupByTimeParameter,
+ maxReturnSize);
+ }
+
+ @Test
+ public void rawDataAggregationOperatorTest() throws IllegalPathException {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+ MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1", TSDataType.TEXT);
+ TypeProvider typeProvider = new TypeProvider();
+ typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+ typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+ List<AggregationDescriptor> aggregationDescriptors =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))));
+
+ List<Aggregator> aggregators = new ArrayList<>();
+ aggregationDescriptors.forEach(
+ o ->
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ o.getAggregationType(), measurementPath.getSeriesType(), true),
+ o.getStep())));
+
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 1000, 10, 10, true);
+ ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, true, false);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, typeProvider);
+
+ RawDataAggregationOperator rawDataAggregationOperator =
+ new RawDataAggregationOperator(
+ Mockito.mock(OperatorContext.class),
+ aggregators,
+ timeRangeIterator,
+ child,
+ true,
+ maxReturnSize);
+
+ long expectedMaxReturnSize =
+ 100
+ * (512 * Byte.BYTES
+ + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+ long expectedMaxRetainSize = child.calculateMaxReturnSize();
+
+ assertEquals(
+ expectedMaxReturnSize
+ + expectedMaxRetainSize
+ + child.calculateRetainedSizeAfterCallingNext(),
+ rawDataAggregationOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, rawDataAggregationOperator.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize + child.calculateRetainedSizeAfterCallingNext(),
+ rawDataAggregationOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void slidingWindowAggregationOperatorTest() throws IllegalPathException {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+ MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1", TSDataType.TEXT);
+ TypeProvider typeProvider = new TypeProvider();
+ typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+ typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+ List<AggregationDescriptor> aggregationDescriptors =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))));
+
+ List<Aggregator> aggregators = new ArrayList<>();
+ aggregationDescriptors.forEach(
+ o ->
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ o.getAggregationType(), measurementPath.getSeriesType(), true),
+ o.getStep())));
+
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 1000, 10, 5, true);
+ ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, true, false);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, typeProvider);
+
+ SlidingWindowAggregationOperator slidingWindowAggregationOperator =
+ new SlidingWindowAggregationOperator(
+ Mockito.mock(OperatorContext.class),
+ aggregators,
+ timeRangeIterator,
+ child,
+ true,
+ groupByTimeParameter,
+ maxReturnSize);
+
+ long expectedMaxReturnSize =
+ 200
+ * (512 * Byte.BYTES
+ + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+ long expectedMaxRetainSize = child.calculateMaxReturnSize();
+
+ assertEquals(
+ expectedMaxReturnSize
+ + expectedMaxRetainSize
+ + child.calculateRetainedSizeAfterCallingNext(),
+ slidingWindowAggregationOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, slidingWindowAggregationOperator.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize + child.calculateRetainedSizeAfterCallingNext(),
+ slidingWindowAggregationOperator.calculateRetainedSizeAfterCallingNext());
+ }
+
+ @Test
+ public void aggregationOperatorTest() throws IllegalPathException {
+ List<Operator> children = new ArrayList<>(4);
+ long expectedChildrenRetainedSize = 0L;
+ long expectedMaxRetainSize = 0L;
+ for (int i = 0; i < 4; i++) {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(64 * 1024L);
+ expectedChildrenRetainedSize += 64 * 1024L;
+ expectedMaxRetainSize += 64 * 1024L;
+ children.add(child);
+ }
+
+ MeasurementPath measurementPath = new MeasurementPath("root.sg.d1.s1", TSDataType.TEXT);
+ TypeProvider typeProvider = new TypeProvider();
+ typeProvider.setType("count(root.sg.d1.s1)", TSDataType.INT64);
+ typeProvider.setType("first_value(root.sg.d1.s1)", TSDataType.TEXT);
+
+ List<AggregationDescriptor> aggregationDescriptors =
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(measurementPath))));
+
+ List<Aggregator> aggregators = new ArrayList<>();
+ aggregationDescriptors.forEach(
+ o ->
+ aggregators.add(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ o.getAggregationType(), measurementPath.getSeriesType(), true),
+ o.getStep())));
+
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 1000, 10, 10, true);
+ ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, true, false);
+ long maxReturnSize =
+ AggregationUtil.calculateMaxAggregationResultSize(
+ aggregationDescriptors, timeRangeIterator, typeProvider);
+
+ AggregationOperator aggregationOperator =
+ new AggregationOperator(
+ Mockito.mock(OperatorContext.class),
+ aggregators,
+ timeRangeIterator,
+ children,
+ maxReturnSize);
+
+ long expectedMaxReturnSize =
+ 100
+ * (512 * Byte.BYTES
+ + TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ + LongColumn.SIZE_IN_BYTES_PER_POSITION);
+
+ assertEquals(
+ expectedMaxReturnSize + expectedMaxRetainSize + expectedChildrenRetainedSize,
+ aggregationOperator.calculateMaxPeekMemory());
+ assertEquals(expectedMaxReturnSize, aggregationOperator.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxRetainSize + expectedChildrenRetainedSize,
+ aggregationOperator.calculateRetainedSizeAfterCallingNext());
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index 6a9902f24b..f25b0d2fcd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -64,6 +64,8 @@ import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
public class RawDataAggregationOperatorTest {
@@ -381,8 +383,9 @@ public class RawDataAggregationOperatorTest {
return new RawDataAggregationOperator(
fragmentInstanceContext.getOperatorContexts().get(3),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, true, true),
timeJoinOperator,
true,
- groupByTimeParameter);
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
index 71fd167404..f3b37b83f7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -59,6 +59,8 @@ import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
public class SeriesAggregationScanOperatorTest {
@@ -515,9 +517,11 @@ public class SeriesAggregationScanOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, ascending, true),
timeFilter,
ascending,
- groupByTimeParameter);
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
return seriesAggregationScanOperator;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
index 81ab99e1ff..a0ed242e63 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
@@ -61,6 +61,8 @@ import java.util.stream.Collectors;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
public class SlidingWindowAggregationOperatorTest {
@@ -232,9 +234,11 @@ public class SlidingWindowAggregationOperatorTest {
Collections.singleton("sensor0"),
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, ascending, true),
null,
ascending,
- groupByTimeParameter);
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
@@ -254,8 +258,10 @@ public class SlidingWindowAggregationOperatorTest {
return new SlidingWindowAggregationOperator(
fragmentInstanceContext.getOperatorContexts().get(1),
finalAggregators,
+ initTimeRangeIterator(groupByTimeParameter, ascending, false),
seriesAggregationScanOperator,
ascending,
- groupByTimeParameter);
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
index 34b9b8fa87..bff57278b1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
@@ -56,6 +56,8 @@ import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -211,9 +213,11 @@ public class UpdateLastCacheOperatorTest {
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
aggregators,
+ initTimeRangeIterator(groupByTimeParameter, ascending, true),
timeFilter,
ascending,
- groupByTimeParameter);
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
seriesAggregationScanOperator.initQueryDataSource(
new QueryDataSource(seqResources, unSeqResources));
diff --git a/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java b/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java
index 15da188fa1..4eb277d663 100644
--- a/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/rescon/ResourceManagerTest.java
@@ -74,15 +74,14 @@ public class ResourceManagerTest {
List<TsFileResource> unseqResources = new ArrayList<>();
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
- private TsFileResourceManager tsFileResourceManager = TsFileResourceManager.getInstance();
- private double prevTimeIndexMemoryProportion;
- private double prevTimeIndexMemoryThreshold;
+ private final TsFileResourceManager tsFileResourceManager = TsFileResourceManager.getInstance();
+ private long prevTimeIndexMemoryThreshold;
private TimeIndexLevel timeIndexLevel;
@Before
public void setUp() throws IOException, WriteProcessException, MetadataException {
IoTDB.configManager.init();
- prevTimeIndexMemoryProportion = CONFIG.getTimeIndexMemoryProportion();
+ prevTimeIndexMemoryThreshold = CONFIG.getAllocateMemoryForTimeIndex();
timeIndexLevel = CONFIG.getTimeIndexLevel();
prepareSeries();
}
@@ -92,10 +91,7 @@ public class ResourceManagerTest {
removeFiles();
seqResources.clear();
unseqResources.clear();
- CONFIG.setTimeIndexMemoryProportion(prevTimeIndexMemoryProportion);
CONFIG.setTimeIndexLevel(String.valueOf(timeIndexLevel));
- prevTimeIndexMemoryThreshold =
- prevTimeIndexMemoryProportion * CONFIG.getAllocateMemoryForRead();
tsFileResourceManager.setTimeIndexMemoryThreshold(prevTimeIndexMemoryThreshold);
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 7827dae848..2eb1ef7a52 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -95,6 +95,8 @@ public enum TSStatusCode {
UNSUPPORTED_INDEX_FUNC_ERROR(421),
UNSUPPORTED_INDEX_TYPE_ERROR(422),
+ MEMORY_NOT_ENOUGH(423),
+
INTERNAL_SERVER_ERROR(500),
CLOSE_OPERATION_ERROR(501),
READ_ONLY_SYSTEM_ERROR(502),