You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/08/11 12:28:24 UTC
[iotdb] branch lmh/AggOpMemoryControl created (now e64b5c5679)
This is an automated email from the ASF dual-hosted git repository.
hui pushed a change to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
at e64b5c5679 temp save
This branch includes the following new commits:
new ad8467a3a8 memory control for SeriesAggregationScanOperator
new 99c77b97e6 memory control for RawDataAggregationOperator
new 7fc64e99a0 memory control for SlidingWindowAggregationOperator
new 8e7c5f6a49 memory control for AggregationOperator
new 1a4e677f80 Merge remote-tracking branch 'origin/master' into MemoryControl
new eb0ff4978c getMaxBinarySizeInBytes by empty stats impl
new f3983e0759 Merge github.com:apache/iotdb into MemoryControl
new a8e2f90072 Merge branch 'MemoryControl' of github.com:apache/iotdb into MemoryControl
new e64b5c5679 temp save
The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[iotdb] 08/09: Merge branch 'MemoryControl' of github.com:apache/iotdb into MemoryControl
Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a8e2f90072122a6de7d9c2f5077ecf9ff5e92535
Merge: f3983e0759 30ff39724d
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 23:23:03 2022 +0800
Merge branch 'MemoryControl' of github.com:apache/iotdb into MemoryControl
.../iotdb/db/mpp/execution/operator/Operator.java | 12 ++++
.../execution/operator/process/FillOperator.java | 8 ++-
.../execution/operator/process/LimitOperator.java | 5 ++
.../operator/process/LinearFillOperator.java | 8 ++-
.../execution/operator/process/OffsetOperator.java | 5 ++
.../execution/operator/process/SortOperator.java | 5 ++
.../process/join/RowBasedTimeJoinOperator.java | 22 +++++-
.../operator/process/join/TimeJoinOperator.java | 22 +++++-
.../process/last/LastQueryCollectOperator.java | 10 +++
.../process/last/LastQueryMergeOperator.java | 31 ++++++--
.../operator/process/last/LastQueryOperator.java | 14 +++-
.../process/last/LastQuerySortOperator.java | 19 +++--
.../process/last/UpdateLastCacheOperator.java | 5 ++
.../operator/source/AlignedSeriesScanOperator.java | 5 ++
.../operator/source/ExchangeOperator.java | 5 ++
.../operator/source/LastCacheScanOperator.java | 5 ++
.../operator/source/SeriesScanOperator.java | 5 ++
.../mpp/execution/operator/OperatorMemoryTest.java | 84 +++++++++++++++++-----
18 files changed, 229 insertions(+), 41 deletions(-)
diff --cc server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index abc5ab3a4b,349e98758b..67be7fd353
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@@ -423,67 -463,8 +468,68 @@@ public class OperatorMemoryTest
new LinearFillOperator(
Mockito.mock(OperatorContext.class), new LinearFill[] {null, null}, child);
- assertEquals(2048 * 3, linearFillOperator.calculateMaxPeekMemory());
+ assertEquals(2048 * 3 + 512L, linearFillOperator.calculateMaxPeekMemory());
assertEquals(1024, linearFillOperator.calculateMaxReturnSize());
+ assertEquals(512, linearFillOperator.calculateRetainedSizeAfterCallingNext());
}
+
+ @Test
+ public void seriesAggregationScanOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ MeasurementPath measurementPath =
+ new MeasurementPath(
+ "root.SeriesAggregationScanOperatorTest.device0.sensor0", TSDataType.INT32);
+ Set<String> allSensors = Sets.newHashSet("sensor0");
+
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesAggregationScanOperatorTest.class.getSimpleName());
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator =
+ new SeriesAggregationScanOperator(
+ planNodeId,
+ measurementPath,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ Arrays.asList(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ AggregationType.COUNT, TSDataType.INT32, true),
+ AggregationStep.SINGLE),
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ AggregationType.MAX_VALUE, TSDataType.INT32, true),
+ AggregationStep.SINGLE),
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ AggregationType.MIN_TIME, TSDataType.INT32, true),
+ AggregationStep.SINGLE)),
+ null,
+ true,
+ null);
+
+ assertEquals(
+ 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
+ + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ seriesAggregationScanOperator.calculateMaxPeekMemory());
+ assertEquals(
+ 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
+ + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ seriesAggregationScanOperator.calculateMaxReturnSize());
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
}
[iotdb] 05/09: Merge remote-tracking branch 'origin/master' into MemoryControl
Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1a4e677f8006e907dd81e613bc59395612104311
Merge: 8e7c5f6a49 a8880b51d2
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 12:51:58 2022 +0800
Merge remote-tracking branch 'origin/master' into MemoryControl
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 2 +-
.../assembly/resources/sbin/remove-confignode.sh | 40 +++++++------
.../assembly/resources/sbin/start-confignode.sh | 36 ++++++------
.../src/assembly/resources/sbin/stop-confignode.sh | 11 ++--
.../AsyncConfigNodeHeartbeatClientPool.java | 4 +-
.../async/datanode/AsyncDataNodeClientPool.java | 4 +-
.../datanode/AsyncDataNodeHeartbeatClientPool.java | 4 +-
.../sync/datanode/SyncDataNodeClientPool.java | 4 +-
.../iotdb/confignode/conf/ConfigNodeConfig.java | 41 -------------
.../confignode/conf/ConfigNodeDescriptor.java | 17 ------
.../iotdb/confignode/persistence/NodeInfo.java | 6 +-
.../procedure/impl/RegionMigrateProcedure.java | 5 +-
.../procedure/state/RegionTransitionState.java | 1 -
.../service/thrift/ConfigNodeRPCService.java | 15 +++--
.../apache/iotdb/consensus/config/RatisConfig.java | 2 +-
.../Administration-Management/Administration.md | 68 +++++++++++-----------
.../Administration-Management/Administration.md | 68 +++++++++++-----------
grafana-plugin/backend-compile.bat | 31 ++++++++++
grafana-plugin/go.mod | 2 +-
grafana-plugin/go.sum | 15 ++---
grafana-plugin/pom.xml | 54 +++++++++--------
.../iotdb/commons/client/ClientPoolFactory.java | 16 ++---
.../apache/iotdb/commons/conf/CommonConfig.java | 40 +++++++++++++
.../iotdb/commons/conf/CommonDescriptor.java | 17 ++++++
.../apache/iotdb/db/client/ConfigNodeClient.java | 3 +-
.../schema/TimeSeriesSchemaScanOperator.java | 25 +++-----
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 3 +
.../scheduler/FixedRateFragInsStateTracker.java | 10 ++--
.../handler/PhysicalPlanValidationHandler.java | 4 +-
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 3 +
.../iotdb/db/sync/pipedata/DeletionPipeData.java | 12 ++--
.../apache/iotdb/db/sync/pipedata/PipeData.java | 24 ++++++--
.../iotdb/db/sync/pipedata/SchemaPipeData.java | 12 ++--
.../iotdb/db/sync/pipedata/TsFilePipeData.java | 19 ++++--
.../sync/pipedata/queue/BufferedPipeDataQueue.java | 2 +-
.../transport/server/TransportServiceImpl.java | 2 +-
.../java/org/apache/iotdb/db/qp/PlannerTest.java | 11 ++++
.../iotdb/db/sync/pipedata/PipeDataTest.java | 12 ++--
.../src/main/thrift/confignode.thrift | 14 ++++-
.../tsfile/read/common/block/TsBlockBuilder.java | 16 +++++
40 files changed, 384 insertions(+), 291 deletions(-)
[iotdb] 02/09: memory control for RawDataAggregationOperator
Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 99c77b97e600239b5c0b194b255e44955af8262d
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 11:29:21 2022 +0800
memory control for RawDataAggregationOperator
---
.../operator/process/RawDataAggregationOperator.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index 88fb5fe20a..1cd922bb46 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -23,12 +23,14 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.utils.Pair;
import java.util.List;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
/**
* RawDataAggregationOperator is used to process raw data tsBlock input calculating using value
@@ -80,4 +82,17 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
inputTsBlock = calcResult.getRight();
return calcResult.getLeft();
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return calculateMaxReturnSize() + child.calculateMaxReturnSize();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ // time + all value columns
+ return (1L + inputTsBlock.getValueColumnCount())
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
+ + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
}
[iotdb] 03/09: memory control for SlidingWindowAggregationOperator
Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7fc64e99a042b8782d0c547568ac5866ee4cf375
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 11:32:49 2022 +0800
memory control for SlidingWindowAggregationOperator
---
.../operator/process/SlidingWindowAggregationOperator.java | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 10303016c5..9f1050772c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -30,6 +30,7 @@ import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
public class SlidingWindowAggregationOperator extends SingleInputAggregationOperator {
@@ -104,4 +105,14 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
}
curSubTimeRange = null;
}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return calculateMaxReturnSize() + child.calculateMaxReturnSize();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
}
[iotdb] 06/09: getMaxBinarySizeInBytes by empty stats impl
Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit eb0ff4978c70109c9c80fde8fd46071ccd63f913
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 23:14:41 2022 +0800
getMaxBinarySizeInBytes by empty stats impl
---
.../iotdb/db/metadata/LocalSchemaProcessor.java | 6 +-
.../db/metadata/rescon/SchemaResourceManager.java | 4 +-
...tatistics.java => SchemaStatisticsManager.java} | 12 ++--
.../schemaregion/SchemaRegionMemoryImpl.java | 12 ++--
.../schemaregion/SchemaRegionSchemaFileImpl.java | 12 ++--
.../iotdb/db/mpp/statistics/StatisticsManager.java | 46 +++++++++++++++
.../iotdb/db/mpp/statistics/TimeseriesStats.java | 24 ++++++++
.../mpp/execution/operator/OperatorMemoryTest.java | 65 ++++++++++++++++++++++
8 files changed, 158 insertions(+), 23 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index 32518a6239..fafd13ddd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.Template;
@@ -466,7 +466,7 @@ public class LocalSchemaProcessor {
// todo this is for test assistance, refactor this to support massive timeseries
if (pathPattern.getFullPath().equals("root.**")
&& TemplateManager.getInstance().getAllTemplateName().isEmpty()) {
- return (int) TimeseriesStatistics.getInstance().getTotalSeriesNumber();
+ return (int) SchemaStatisticsManager.getInstance().getTotalSeriesNumber();
}
int count = 0;
for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
@@ -1380,7 +1380,7 @@ public class LocalSchemaProcessor {
@TestOnly
public long getTotalSeriesNumber() {
- return TimeseriesStatistics.getInstance().getTotalSeriesNumber();
+ return SchemaStatisticsManager.getInstance().getTotalSeriesNumber();
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
index 0fe18ac955..c5df8ff9a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
@@ -30,7 +30,7 @@ public class SchemaResourceManager {
private SchemaResourceManager() {}
public static void initSchemaResource() {
- TimeseriesStatistics.getInstance().init();
+ SchemaStatisticsManager.getInstance().init();
MemoryStatistics.getInstance().init();
if (IoTDBDescriptor.getInstance()
.getConfig()
@@ -41,7 +41,7 @@ public class SchemaResourceManager {
}
public static void clearSchemaResource() {
- TimeseriesStatistics.getInstance().clear();
+ SchemaStatisticsManager.getInstance().clear();
MemoryStatistics.getInstance().clear();
if (IoTDBDescriptor.getInstance()
.getConfig()
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
similarity index 87%
rename from server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
rename to server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
index 097d1accce..f79e3f7a20 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
@@ -26,22 +26,22 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import java.util.concurrent.atomic.AtomicLong;
-public class TimeseriesStatistics {
+public class SchemaStatisticsManager {
private final AtomicLong totalSeriesNumber = new AtomicLong();
- private static class TimeseriesStatisticsHolder {
+ private static class SchemaStatisticsHolder {
- private TimeseriesStatisticsHolder() {
+ private SchemaStatisticsHolder() {
// allowed to do nothing
}
- private static final TimeseriesStatistics INSTANCE = new TimeseriesStatistics();
+ private static final SchemaStatisticsManager INSTANCE = new SchemaStatisticsManager();
}
/** we should not use this function in other place, but only in IoTDB class */
- public static TimeseriesStatistics getInstance() {
- return TimeseriesStatisticsHolder.INSTANCE;
+ public static SchemaStatisticsManager getInstance() {
+ return SchemaStatisticsHolder.INSTANCE;
}
public void init() {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 78754414eb..1cef2e826e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -52,7 +52,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGMemoryImpl;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
import org.apache.iotdb.db.metadata.tag.TagManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.TemplateManager;
@@ -165,7 +165,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
private boolean usingMLog = true;
private MLogWriter logWriter;
- private TimeseriesStatistics timeseriesStatistics = TimeseriesStatistics.getInstance();
+ private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance();
private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
private final IStorageGroupMNode storageGroupMNode;
@@ -451,7 +451,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
// collect all the LeafMNode in this schema region
List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
- timeseriesStatistics.deleteTimeseries(leafMNodes.size());
+ schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
// drop triggers with no exceptions
TriggerEngine.drop(leafMNodes);
@@ -598,7 +598,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
mNodeCache.invalidate(path.getDevicePath());
// update statistics and schemaDataTypeNumMap
- timeseriesStatistics.addTimeseries(1);
+ schemaStatisticsManager.addTimeseries(1);
// update tag index
if (offset != -1 && isRecovering) {
@@ -715,7 +715,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
mNodeCache.invalidate(prefixPath);
// update statistics and schemaDataTypeNumMap
- timeseriesStatistics.addTimeseries(plan.getMeasurements().size());
+ schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
List<Long> tagOffsets = plan.getTagOffsets();
for (int i = 0; i < measurements.size(); i++) {
@@ -857,7 +857,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
mNodeCache.invalidate(node.getPartialPath());
- timeseriesStatistics.deleteTimeseries(1);
+ schemaStatisticsManager.deleteTimeseries(1);
return storageGroupPath;
}
// endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index a49e3767de..0602891c67 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -50,7 +50,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGCachedImpl;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
import org.apache.iotdb.db.metadata.tag.TagManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.TemplateManager;
@@ -161,7 +161,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
private File logFile;
private MLogWriter logWriter;
- private TimeseriesStatistics timeseriesStatistics = TimeseriesStatistics.getInstance();
+ private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance();
private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
private final IStorageGroupMNode storageGroupMNode;
@@ -412,7 +412,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
// collect all the LeafMNode in this schema region
List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
- timeseriesStatistics.deleteTimeseries(leafMNodes.size());
+ schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
// drop triggers with no exceptions
TriggerEngine.drop(leafMNodes);
@@ -494,7 +494,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
mNodeCache.invalidate(path.getDevicePath());
// update statistics and schemaDataTypeNumMap
- timeseriesStatistics.addTimeseries(1);
+ schemaStatisticsManager.addTimeseries(1);
// update tag index
if (offset != -1 && isRecovering) {
@@ -636,7 +636,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
mNodeCache.invalidate(prefixPath);
// update statistics and schemaDataTypeNumMap
- timeseriesStatistics.addTimeseries(plan.getMeasurements().size());
+ schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
List<Long> tagOffsets = plan.getTagOffsets();
for (int i = 0; i < measurements.size(); i++) {
@@ -784,7 +784,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
mNodeCache.invalidate(node.getPartialPath());
- timeseriesStatistics.deleteTimeseries(1);
+ schemaStatisticsManager.deleteTimeseries(1);
return storageGroupPath;
}
// endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
new file mode 100644
index 0000000000..44d5fc1c66
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.statistics;
+
+import org.apache.iotdb.commons.path.PartialPath;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+public class StatisticsManager {
+
+ private final Map<PartialPath, TimeseriesStats> seriesToStatsMap = Maps.newConcurrentMap();
+
+ public long getMaxBinarySizeInBytes(PartialPath path) {
+ return 512 * Byte.BYTES;
+ }
+
+ public static StatisticsManager getInstance() {
+ return StatisticsManager.StatisticsManagerHelper.INSTANCE;
+ }
+
+ private static class StatisticsManagerHelper {
+
+ private static final StatisticsManager INSTANCE = new StatisticsManager();
+
+ private StatisticsManagerHelper() {}
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
new file mode 100644
index 0000000000..509341d3d5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.statistics;
+
+public class TimeseriesStats {
+ // TODO collect time series statistics
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index b57ee14dcf..abc5ab3a4b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
@@ -44,9 +46,12 @@ import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOp
import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -421,4 +426,64 @@ public class OperatorMemoryTest {
assertEquals(2048 * 3, linearFillOperator.calculateMaxPeekMemory());
assertEquals(1024, linearFillOperator.calculateMaxReturnSize());
}
+
+ @Test
+ public void seriesAggregationScanOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ MeasurementPath measurementPath =
+ new MeasurementPath(
+ "root.SeriesAggregationScanOperatorTest.device0.sensor0", TSDataType.INT32);
+ Set<String> allSensors = Sets.newHashSet("sensor0");
+
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesAggregationScanOperatorTest.class.getSimpleName());
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator =
+ new SeriesAggregationScanOperator(
+ planNodeId,
+ measurementPath,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ Arrays.asList(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ AggregationType.COUNT, TSDataType.INT32, true),
+ AggregationStep.SINGLE),
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ AggregationType.MAX_VALUE, TSDataType.INT32, true),
+ AggregationStep.SINGLE),
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ AggregationType.MIN_TIME, TSDataType.INT32, true),
+ AggregationStep.SINGLE)),
+ null,
+ true,
+ null);
+
+ assertEquals(
+ 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
+ + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ seriesAggregationScanOperator.calculateMaxPeekMemory());
+ assertEquals(
+ 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
+ + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ seriesAggregationScanOperator.calculateMaxReturnSize());
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
}
[iotdb] 07/09: Merge github.com:apache/iotdb into MemoryControl
Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f3983e0759c7b2ed50a60e11b7cbd83433085a15
Merge: eb0ff4978c f1787b83c5
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 23:22:02 2022 +0800
Merge github.com:apache/iotdb into MemoryControl
.github/workflows/sync.yml | 20 +-
.../confignode/manager/ClusterSchemaManager.java | 4 +-
.../iotdb/confignode/manager/ConfigManager.java | 12 +-
.../apache/iotdb/confignode/manager/IManager.java | 9 +
.../iotdb/confignode/manager/NodeManager.java | 54 +-
.../confignode/manager/PermissionManager.java | 2 +-
.../iotdb/confignode/manager/UDFManager.java | 4 +-
.../iotdb/confignode/manager/load/LoadManager.java | 16 +-
.../manager/load/balancer/RegionBalancer.java | 2 +-
.../manager/load/balancer/RouteBalancer.java | 2 +-
.../iotdb/confignode/persistence/NodeInfo.java | 16 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 4 +-
.../procedure/env/DataNodeRemoveHandler.java | 6 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 3 +-
.../iotdb/confignode/persistence/NodeInfoTest.java | 4 +-
.../thrift/ConfigNodeRPCServiceProcessorTest.java | 510 ----------------
.../org/apache/iotdb/consensus/IStateMachine.java | 15 +
.../client/AsyncMultiLeaderServiceClient.java | 5 +-
.../client/MultiLeaderConsensusClientPool.java | 5 +-
integration-test/import-control.xml | 4 +-
.../org/apache/iotdb/db/it/IoTDBConfigNodeIT.java | 639 +++++++++++++++++++++
.../sync/IoTDBSyncReceiverCollectorIT.java | 513 -----------------
.../db/integration/sync/IoTDBSyncReceiverIT.java | 200 +------
.../db/integration/sync/IoTDBSyncSenderIT.java | 2 +
.../db/integration/sync/TransportClientMock.java | 9 -
.../db/integration/sync/TransportHandlerMock.java | 3 -
node-commons/pom.xml | 5 +
.../commons/client/AsyncBaseClientFactory.java | 24 +-
.../iotdb/commons/client/ClientPoolFactory.java | 15 +-
.../AsyncConfigNodeHeartbeatServiceClient.java | 5 +-
.../async/AsyncConfigNodeIServiceClient.java | 5 +-
.../async/AsyncDataNodeHeartbeatServiceClient.java | 5 +-
.../async/AsyncDataNodeInternalServiceClient.java | 5 +-
.../AsyncDataNodeMPPDataExchangeServiceClient.java | 5 +-
.../iotdb/commons/concurrent/ThreadName.java | 3 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 13 +
.../iotdb/commons/conf/CommonDescriptor.java | 1 +
.../apache/iotdb/commons/sync}/SyncConstant.java | 22 +-
.../apache/iotdb/commons/sync}/SyncPathUtil.java | 24 +-
.../iotdb/commons/client/ClientManagerTest.java | 4 +-
.../schemaregion/rocksdb/RSchemaRegion.java | 8 +
.../iotdb/db/client/DataNodeClientPoolFactory.java | 28 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 -
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 2 -
.../statemachine/DataRegionStateMachine.java | 6 +
.../sync/PipeDataLoadBearableException.java | 25 -
.../sync/PipeDataLoadUnbearableException.java | 25 -
.../db/metadata/schemaregion/ISchemaRegion.java | 6 +
.../schemaregion/SchemaRegionMemoryImpl.java | 45 ++
.../schemaregion/SchemaRegionSchemaFileImpl.java | 8 +
.../mpp/common/schematree/ClusterSchemaTree.java | 15 +-
.../common/schematree/DeviceGroupSchemaTree.java | 98 ++++
.../db/mpp/common/schematree/DeviceSchemaInfo.java | 110 +++-
.../common/schematree/MeasurementSchemaInfo.java | 53 ++
.../visitor/SchemaTreeDeviceVisitor.java | 13 +-
.../db/mpp/execution/exchange/SourceHandle.java | 2 +-
.../operator/process/FilterAndProjectOperator.java | 11 +-
.../mpp/plan/analyze/StandaloneSchemaFetcher.java | 231 ++------
.../plan/expression/multi/FunctionExpression.java | 3 +
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 56 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 55 +-
.../iotdb/db/qp/physical/sys/CreatePipePlan.java | 2 +-
.../db/qp/physical/sys/CreatePipeSinkPlan.java | 2 +-
.../iotdb/db/sync/common/ISyncInfoFetcher.java | 69 +++
.../iotdb/db/sync/common/LocalSyncInfoFetcher.java | 186 ++++++
.../org/apache/iotdb/db/sync/common/SyncInfo.java | 292 ++++++++++
.../db/sync/common/persistence/SyncLogReader.java | 186 ++++++
.../db/sync/common/persistence/SyncLogWriter.java | 147 +++++
.../iotdb/db/sync/datasource/AbstractOpBlock.java | 16 +-
.../iotdb/db/sync/datasource/DeletionGroup.java | 242 ++++++++
.../iotdb/db/sync/datasource/ModsfileOpBlock.java | 53 --
.../iotdb/db/sync/datasource/PipeOpManager.java | 6 +-
.../iotdb/db/sync/datasource/TsFileOpBlock.java | 463 ++++++++++++---
.../iotdb/db/sync/externalpipe/ExtPipePlugin.java | 15 +-
.../db/sync/externalpipe/ExtPipePluginManager.java | 4 +-
.../iotdb/db/sync/pipedata/TsFilePipeData.java | 14 +-
.../sync/pipedata/queue/BufferedPipeDataQueue.java | 4 +-
.../sync/pipedata/queue/PipeDataQueueFactory.java | 57 --
.../iotdb/db/sync/receiver/ReceiverService.java | 195 +------
.../db/sync/receiver/collector/Collector.java | 171 ------
.../db/sync/receiver/load/DeletionLoader.java | 5 +-
.../iotdb/db/sync/receiver/load/SchemaLoader.java | 8 +-
.../iotdb/db/sync/receiver/load/TsFileLoader.java | 3 +-
.../db/sync/receiver/manager/ReceiverManager.java | 229 --------
.../db/sync/receiver/recovery/ReceiverLog.java | 127 ----
.../receiver/recovery/ReceiverLogAnalyzer.java | 157 -----
.../iotdb/db/sync/sender/pipe/IoTDBPipeSink.java | 2 +-
.../org/apache/iotdb/db/sync/sender/pipe/Pipe.java | 4 -
.../manager => sender/pipe}/PipeInfo.java | 61 +-
.../iotdb/db/sync/sender/pipe/TsFilePipe.java | 14 +-
.../iotdb/db/sync/sender/pipe/TsFilePipeInfo.java | 63 ++
.../db/sync/sender/recovery/SenderLogAnalyzer.java | 165 ------
.../db/sync/sender/recovery/SenderLogger.java | 141 -----
.../db/sync/sender/recovery/TsFilePipeLogger.java | 4 +-
.../iotdb/db/sync/sender/service/MsgManager.java | 114 ----
.../db/sync/sender/service/SenderService.java | 377 ++++++------
.../db/sync/sender/service/TransportHandler.java | 56 +-
.../db/sync/transport/client/ClientWrapper.java | 4 +-
.../db/sync/transport/client/ITransportClient.java | 10 +-
...rtClient.java => IoTDBSInkTransportClient.java} | 148 +----
.../db/sync/transport/conf/TransportConstant.java | 36 --
.../transport/server/TransportServiceImpl.java | 48 +-
.../apache/iotdb/db/utils/sync/SyncPipeUtil.java | 96 ++++
.../db/sync/datasource/DeletionGroupTest.java | 231 ++++++++
.../db/sync/datasource/PipeOpManagerTest.java | 226 +++++++-
.../db/sync/datasource/TsFileOpBlockTest.java | 372 +++++++++++-
.../sync/pipedata/BufferedPipeDataQueueTest.java | 4 +-
...{ReceiverManagerTest.java => SyncInfoTest.java} | 71 +--
...ceiverLogAnalyzerTest.java => SyncLogTest.java} | 68 ++-
.../db/sync/transport/TransportServiceTest.java | 197 +++----
.../apache/iotdb/db/utils/EnvironmentUtils.java | 13 +-
.../datanode1conf/iotdb-datanode.properties | 1 +
.../datanode2conf/iotdb-datanode.properties | 1 +
.../datanode3conf/iotdb-datanode.properties | 1 +
thrift-sync/src/main/thrift/transport.thrift | 27 -
115 files changed, 4291 insertions(+), 3929 deletions(-)
[iotdb] 01/09: memory control for SeriesAggregationScanOperator
Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ad8467a3a8c664d8f5ca0de4327f7a8f33bdba13
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 11:21:33 2022 +0800
memory control for SeriesAggregationScanOperator
---
.../source/AbstractSeriesAggregationScanOperator.java | 18 ++++++++++++++++++
1 file changed, 18 insertions(+)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 2d7d671be9..a88fff89d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
@@ -42,6 +43,7 @@ import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendA
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
public abstract class AbstractSeriesAggregationScanOperator implements DataSourceOperator {
@@ -68,6 +70,8 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
protected boolean finished = false;
+ private final long maxReturnSize;
+
public AbstractSeriesAggregationScanOperator(
PlanNodeId sourceId,
OperatorContext context,
@@ -91,6 +95,10 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+ this.maxReturnSize =
+ (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
+ + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
}
@Override
@@ -108,6 +116,16 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
seriesScanUtil.initQueryDataSource(dataSource);
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return maxReturnSize;
+ }
+
@Override
public boolean hasNext() {
return timeRangeIterator.hasNextTimeRange();
[iotdb] 04/09: memory control for AggregationOperator
Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8e7c5f6a49f2285e711509d6247884da45db8d28
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 11:41:04 2022 +0800
memory control for AggregationOperator
---
.../execution/operator/process/AggregationOperator.java | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 8cf4aef8b0..92da980d1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.util.concurrent.Futures.successfulAsList;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
/**
* AggregationOperator can process the situation: aggregation of intermediate aggregate result, it
@@ -95,6 +96,22 @@ public class AggregationOperator implements ProcessOperator {
return operatorContext;
}
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = calculateMaxReturnSize();
+ long childrenMaxPeekMemory = 0;
+ for (Operator child : children) {
+ maxPeekMemory += child.calculateMaxReturnSize();
+ childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+ }
+ return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return (1L + inputOperatorsCount) * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
@Override
public ListenableFuture<?> isBlocked() {
List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
[iotdb] 09/09: temp save
Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e64b5c567905c3996f44b9bd5b671e39bf8a7185
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Aug 11 20:27:22 2022 +0800
temp save
---
.../aggregation/timerangeiterator/AggrWindowIterator.java | 5 +++++
.../aggregation/timerangeiterator/ITimeRangeIterator.java | 2 ++
.../timerangeiterator/SingleTimeWindowIterator.java | 5 +++++
.../source/AbstractSeriesAggregationScanOperator.java | 14 ++++++++++----
4 files changed, 22 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
index f27c56223c..98dd32abc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
@@ -176,4 +176,9 @@ public class AggrWindowIterator implements ITimeRangeIterator {
public long currentOutputTime() {
return leftCRightO ? curTimeRange.getMin() : curTimeRange.getMax();
}
+
+ @Override
+ public int getTotalTimeRangeNum() {
+ return 0;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
index ccc999e810..76161dd684 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
@@ -55,4 +55,6 @@ public interface ITimeRangeIterator {
* @return minTime if leftCloseRightOpen, else maxTime.
*/
long currentOutputTime();
+
+ int getTotalTimeRangeNum();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
index b998557604..291e4f3b4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
@@ -71,4 +71,9 @@ public class SingleTimeWindowIterator implements ITimeRangeIterator {
public long currentOutputTime() {
return curTimeRange.getMin();
}
+
+ @Override
+ public int getTotalTimeRangeNum() {
+ return 1;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index a88fff89d5..fee5102441 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -70,6 +70,7 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
protected boolean finished = false;
+ private final long maxRetainedSize;
private final long maxReturnSize;
public AbstractSeriesAggregationScanOperator(
@@ -96,9 +97,9 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
}
this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
- this.maxReturnSize =
- (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
- + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ this.maxRetainedSize =
+ (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ this.maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
}
@Override
@@ -118,7 +119,7 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
@Override
public long calculateMaxPeekMemory() {
- return maxReturnSize;
+ return maxRetainedSize + maxReturnSize;
}
@Override
@@ -126,6 +127,11 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
return maxReturnSize;
}
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return maxRetainedSize;
+ }
+
@Override
public boolean hasNext() {
return timeRangeIterator.hasNextTimeRange();