You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/08/11 12:28:24 UTC

[iotdb] branch lmh/AggOpMemoryControl created (now e64b5c5679)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at e64b5c5679 temp save

This branch includes the following new commits:

     new ad8467a3a8 memory control for SeriesAggregationScanOperator
     new 99c77b97e6 memory control for RawDataAggregationOperator
     new 7fc64e99a0 memory control for SlidingWindowAggregationOperator
     new 8e7c5f6a49 memory control for AggregationOperator
     new 1a4e677f80 Merge remote-tracking branch 'origin/master' into MemoryControl
     new eb0ff4978c getMaxBinarySizeInBytes by empty stats impl
     new f3983e0759 Merge github.com:apache/iotdb into MemoryControl
     new a8e2f90072 Merge branch 'MemoryControl' of github.com:apache/iotdb into MemoryControl
     new e64b5c5679 temp save

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 08/09: Merge branch 'MemoryControl' of github.com:apache/iotdb into MemoryControl

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a8e2f90072122a6de7d9c2f5077ecf9ff5e92535
Merge: f3983e0759 30ff39724d
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 23:23:03 2022 +0800

    Merge branch 'MemoryControl' of github.com:apache/iotdb into MemoryControl

 .../iotdb/db/mpp/execution/operator/Operator.java  | 12 ++++
 .../execution/operator/process/FillOperator.java   |  8 ++-
 .../execution/operator/process/LimitOperator.java  |  5 ++
 .../operator/process/LinearFillOperator.java       |  8 ++-
 .../execution/operator/process/OffsetOperator.java |  5 ++
 .../execution/operator/process/SortOperator.java   |  5 ++
 .../process/join/RowBasedTimeJoinOperator.java     | 22 +++++-
 .../operator/process/join/TimeJoinOperator.java    | 22 +++++-
 .../process/last/LastQueryCollectOperator.java     | 10 +++
 .../process/last/LastQueryMergeOperator.java       | 31 ++++++--
 .../operator/process/last/LastQueryOperator.java   | 14 +++-
 .../process/last/LastQuerySortOperator.java        | 19 +++--
 .../process/last/UpdateLastCacheOperator.java      |  5 ++
 .../operator/source/AlignedSeriesScanOperator.java |  5 ++
 .../operator/source/ExchangeOperator.java          |  5 ++
 .../operator/source/LastCacheScanOperator.java     |  5 ++
 .../operator/source/SeriesScanOperator.java        |  5 ++
 .../mpp/execution/operator/OperatorMemoryTest.java | 84 +++++++++++++++++-----
 18 files changed, 229 insertions(+), 41 deletions(-)

diff --cc server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index abc5ab3a4b,349e98758b..67be7fd353
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@@ -423,67 -463,8 +468,68 @@@ public class OperatorMemoryTest 
          new LinearFillOperator(
              Mockito.mock(OperatorContext.class), new LinearFill[] {null, null}, child);
  
-     assertEquals(2048 * 3, linearFillOperator.calculateMaxPeekMemory());
+     assertEquals(2048 * 3 + 512L, linearFillOperator.calculateMaxPeekMemory());
      assertEquals(1024, linearFillOperator.calculateMaxReturnSize());
+     assertEquals(512, linearFillOperator.calculateRetainedSizeAfterCallingNext());
    }
 +
 +  @Test
 +  public void seriesAggregationScanOperatorTest() {
 +    ExecutorService instanceNotificationExecutor =
 +        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
 +    try {
 +      MeasurementPath measurementPath =
 +          new MeasurementPath(
 +              "root.SeriesAggregationScanOperatorTest.device0.sensor0", TSDataType.INT32);
 +      Set<String> allSensors = Sets.newHashSet("sensor0");
 +
 +      QueryId queryId = new QueryId("stub_query");
 +      FragmentInstanceId instanceId =
 +          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
 +      FragmentInstanceStateMachine stateMachine =
 +          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 +      FragmentInstanceContext fragmentInstanceContext =
 +          createFragmentInstanceContext(instanceId, stateMachine);
 +      PlanNodeId planNodeId = new PlanNodeId("1");
 +      fragmentInstanceContext.addOperatorContext(
 +          1, planNodeId, SeriesAggregationScanOperatorTest.class.getSimpleName());
 +
 +      SeriesAggregationScanOperator seriesAggregationScanOperator =
 +          new SeriesAggregationScanOperator(
 +              planNodeId,
 +              measurementPath,
 +              allSensors,
 +              fragmentInstanceContext.getOperatorContexts().get(0),
 +              Arrays.asList(
 +                  new Aggregator(
 +                      AccumulatorFactory.createAccumulator(
 +                          AggregationType.COUNT, TSDataType.INT32, true),
 +                      AggregationStep.SINGLE),
 +                  new Aggregator(
 +                      AccumulatorFactory.createAccumulator(
 +                          AggregationType.MAX_VALUE, TSDataType.INT32, true),
 +                      AggregationStep.SINGLE),
 +                  new Aggregator(
 +                      AccumulatorFactory.createAccumulator(
 +                          AggregationType.MIN_TIME, TSDataType.INT32, true),
 +                      AggregationStep.SINGLE)),
 +              null,
 +              true,
 +              null);
 +
 +      assertEquals(
 +          2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
 +              + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
 +          seriesAggregationScanOperator.calculateMaxPeekMemory());
 +      assertEquals(
 +          2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
 +              + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
 +          seriesAggregationScanOperator.calculateMaxReturnSize());
 +    } catch (IllegalPathException e) {
 +      e.printStackTrace();
 +      fail();
 +    } finally {
 +      instanceNotificationExecutor.shutdown();
 +    }
 +  }
  }


[iotdb] 05/09: Merge remote-tracking branch 'origin/master' into MemoryControl

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1a4e677f8006e907dd81e613bc59395612104311
Merge: 8e7c5f6a49 a8880b51d2
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 12:51:58 2022 +0800

    Merge remote-tracking branch 'origin/master' into MemoryControl

 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  2 +-
 .../assembly/resources/sbin/remove-confignode.sh   | 40 +++++++------
 .../assembly/resources/sbin/start-confignode.sh    | 36 ++++++------
 .../src/assembly/resources/sbin/stop-confignode.sh | 11 ++--
 .../AsyncConfigNodeHeartbeatClientPool.java        |  4 +-
 .../async/datanode/AsyncDataNodeClientPool.java    |  4 +-
 .../datanode/AsyncDataNodeHeartbeatClientPool.java |  4 +-
 .../sync/datanode/SyncDataNodeClientPool.java      |  4 +-
 .../iotdb/confignode/conf/ConfigNodeConfig.java    | 41 -------------
 .../confignode/conf/ConfigNodeDescriptor.java      | 17 ------
 .../iotdb/confignode/persistence/NodeInfo.java     |  6 +-
 .../procedure/impl/RegionMigrateProcedure.java     |  5 +-
 .../procedure/state/RegionTransitionState.java     |  1 -
 .../service/thrift/ConfigNodeRPCService.java       | 15 +++--
 .../apache/iotdb/consensus/config/RatisConfig.java |  2 +-
 .../Administration-Management/Administration.md    | 68 +++++++++++-----------
 .../Administration-Management/Administration.md    | 68 +++++++++++-----------
 grafana-plugin/backend-compile.bat                 | 31 ++++++++++
 grafana-plugin/go.mod                              |  2 +-
 grafana-plugin/go.sum                              | 15 ++---
 grafana-plugin/pom.xml                             | 54 +++++++++--------
 .../iotdb/commons/client/ClientPoolFactory.java    | 16 ++---
 .../apache/iotdb/commons/conf/CommonConfig.java    | 40 +++++++++++++
 .../iotdb/commons/conf/CommonDescriptor.java       | 17 ++++++
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  3 +-
 .../schema/TimeSeriesSchemaScanOperator.java       | 25 +++-----
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  3 +
 .../scheduler/FixedRateFragInsStateTracker.java    | 10 ++--
 .../handler/PhysicalPlanValidationHandler.java     |  4 +-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  3 +
 .../iotdb/db/sync/pipedata/DeletionPipeData.java   | 12 ++--
 .../apache/iotdb/db/sync/pipedata/PipeData.java    | 24 ++++++--
 .../iotdb/db/sync/pipedata/SchemaPipeData.java     | 12 ++--
 .../iotdb/db/sync/pipedata/TsFilePipeData.java     | 19 ++++--
 .../sync/pipedata/queue/BufferedPipeDataQueue.java |  2 +-
 .../transport/server/TransportServiceImpl.java     |  2 +-
 .../java/org/apache/iotdb/db/qp/PlannerTest.java   | 11 ++++
 .../iotdb/db/sync/pipedata/PipeDataTest.java       | 12 ++--
 .../src/main/thrift/confignode.thrift              | 14 ++++-
 .../tsfile/read/common/block/TsBlockBuilder.java   | 16 +++++
 40 files changed, 384 insertions(+), 291 deletions(-)


[iotdb] 02/09: memory control for RawDataAggregationOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 99c77b97e600239b5c0b194b255e44955af8262d
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 11:29:21 2022 +0800

    memory control for RawDataAggregationOperator
---
 .../operator/process/RawDataAggregationOperator.java      | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index 88fb5fe20a..1cd922bb46 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -23,12 +23,14 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.List;
 
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 /**
  * RawDataAggregationOperator is used to process raw data tsBlock input calculating using value
@@ -80,4 +82,17 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
     inputTsBlock = calcResult.getRight();
     return calcResult.getLeft();
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return calculateMaxReturnSize() + child.calculateMaxReturnSize();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // time + all value columns
+    return (1L + inputTsBlock.getValueColumnCount())
+            * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
+        + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
 }


[iotdb] 03/09: memory control for SlidingWindowAggregationOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7fc64e99a042b8782d0c547568ac5866ee4cf375
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 11:32:49 2022 +0800

    memory control for SlidingWindowAggregationOperator
---
 .../operator/process/SlidingWindowAggregationOperator.java    | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 10303016c5..9f1050772c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -30,6 +30,7 @@ import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class SlidingWindowAggregationOperator extends SingleInputAggregationOperator {
 
@@ -104,4 +105,14 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
     }
     curSubTimeRange = null;
   }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return calculateMaxReturnSize() + child.calculateMaxReturnSize();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
 }


[iotdb] 06/09: getMaxBinarySizeInBytes by empty stats impl

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit eb0ff4978c70109c9c80fde8fd46071ccd63f913
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 23:14:41 2022 +0800

    getMaxBinarySizeInBytes by empty stats impl
---
 .../iotdb/db/metadata/LocalSchemaProcessor.java    |  6 +-
 .../db/metadata/rescon/SchemaResourceManager.java  |  4 +-
 ...tatistics.java => SchemaStatisticsManager.java} | 12 ++--
 .../schemaregion/SchemaRegionMemoryImpl.java       | 12 ++--
 .../schemaregion/SchemaRegionSchemaFileImpl.java   | 12 ++--
 .../iotdb/db/mpp/statistics/StatisticsManager.java | 46 +++++++++++++++
 .../iotdb/db/mpp/statistics/TimeseriesStats.java   | 24 ++++++++
 .../mpp/execution/operator/OperatorMemoryTest.java | 65 ++++++++++++++++++++++
 8 files changed, 158 insertions(+), 23 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index 32518a6239..fafd13ddd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.metadata.mnode.IMNode;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.template.Template;
@@ -466,7 +466,7 @@ public class LocalSchemaProcessor {
     // todo this is for test assistance, refactor this to support massive timeseries
     if (pathPattern.getFullPath().equals("root.**")
         && TemplateManager.getInstance().getAllTemplateName().isEmpty()) {
-      return (int) TimeseriesStatistics.getInstance().getTotalSeriesNumber();
+      return (int) SchemaStatisticsManager.getInstance().getTotalSeriesNumber();
     }
     int count = 0;
     for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
@@ -1380,7 +1380,7 @@ public class LocalSchemaProcessor {
 
   @TestOnly
   public long getTotalSeriesNumber() {
-    return TimeseriesStatistics.getInstance().getTotalSeriesNumber();
+    return SchemaStatisticsManager.getInstance().getTotalSeriesNumber();
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
index 0fe18ac955..c5df8ff9a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
@@ -30,7 +30,7 @@ public class SchemaResourceManager {
   private SchemaResourceManager() {}
 
   public static void initSchemaResource() {
-    TimeseriesStatistics.getInstance().init();
+    SchemaStatisticsManager.getInstance().init();
     MemoryStatistics.getInstance().init();
     if (IoTDBDescriptor.getInstance()
         .getConfig()
@@ -41,7 +41,7 @@ public class SchemaResourceManager {
   }
 
   public static void clearSchemaResource() {
-    TimeseriesStatistics.getInstance().clear();
+    SchemaStatisticsManager.getInstance().clear();
     MemoryStatistics.getInstance().clear();
     if (IoTDBDescriptor.getInstance()
         .getConfig()
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
similarity index 87%
rename from server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
rename to server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
index 097d1accce..f79e3f7a20 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
@@ -26,22 +26,22 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 
 import java.util.concurrent.atomic.AtomicLong;
 
-public class TimeseriesStatistics {
+public class SchemaStatisticsManager {
 
   private final AtomicLong totalSeriesNumber = new AtomicLong();
 
-  private static class TimeseriesStatisticsHolder {
+  private static class SchemaStatisticsHolder {
 
-    private TimeseriesStatisticsHolder() {
+    private SchemaStatisticsHolder() {
       // allowed to do nothing
     }
 
-    private static final TimeseriesStatistics INSTANCE = new TimeseriesStatistics();
+    private static final SchemaStatisticsManager INSTANCE = new SchemaStatisticsManager();
   }
 
   /** we should not use this function in other place, but only in IoTDB class */
-  public static TimeseriesStatistics getInstance() {
-    return TimeseriesStatisticsHolder.INSTANCE;
+  public static SchemaStatisticsManager getInstance() {
+    return SchemaStatisticsHolder.INSTANCE;
   }
 
   public void init() {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 78754414eb..1cef2e826e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -52,7 +52,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGMemoryImpl;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
 import org.apache.iotdb.db.metadata.tag.TagManager;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.template.TemplateManager;
@@ -165,7 +165,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   private boolean usingMLog = true;
   private MLogWriter logWriter;
 
-  private TimeseriesStatistics timeseriesStatistics = TimeseriesStatistics.getInstance();
+  private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance();
   private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
 
   private final IStorageGroupMNode storageGroupMNode;
@@ -451,7 +451,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     // collect all the LeafMNode in this schema region
     List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
 
-    timeseriesStatistics.deleteTimeseries(leafMNodes.size());
+    schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
 
     // drop triggers with no exceptions
     TriggerEngine.drop(leafMNodes);
@@ -598,7 +598,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       mNodeCache.invalidate(path.getDevicePath());
 
       // update statistics and schemaDataTypeNumMap
-      timeseriesStatistics.addTimeseries(1);
+      schemaStatisticsManager.addTimeseries(1);
 
       // update tag index
       if (offset != -1 && isRecovering) {
@@ -715,7 +715,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       mNodeCache.invalidate(prefixPath);
 
       // update statistics and schemaDataTypeNumMap
-      timeseriesStatistics.addTimeseries(plan.getMeasurements().size());
+      schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
 
       List<Long> tagOffsets = plan.getTagOffsets();
       for (int i = 0; i < measurements.size(); i++) {
@@ -857,7 +857,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
 
     mNodeCache.invalidate(node.getPartialPath());
 
-    timeseriesStatistics.deleteTimeseries(1);
+    schemaStatisticsManager.deleteTimeseries(1);
     return storageGroupPath;
   }
   // endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index a49e3767de..0602891c67 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -50,7 +50,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGCachedImpl;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
 import org.apache.iotdb.db.metadata.tag.TagManager;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.template.TemplateManager;
@@ -161,7 +161,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
   private File logFile;
   private MLogWriter logWriter;
 
-  private TimeseriesStatistics timeseriesStatistics = TimeseriesStatistics.getInstance();
+  private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance();
   private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
 
   private final IStorageGroupMNode storageGroupMNode;
@@ -412,7 +412,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     // collect all the LeafMNode in this schema region
     List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
 
-    timeseriesStatistics.deleteTimeseries(leafMNodes.size());
+    schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
 
     // drop triggers with no exceptions
     TriggerEngine.drop(leafMNodes);
@@ -494,7 +494,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
         mNodeCache.invalidate(path.getDevicePath());
 
         // update statistics and schemaDataTypeNumMap
-        timeseriesStatistics.addTimeseries(1);
+        schemaStatisticsManager.addTimeseries(1);
 
         // update tag index
         if (offset != -1 && isRecovering) {
@@ -636,7 +636,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
         mNodeCache.invalidate(prefixPath);
 
         // update statistics and schemaDataTypeNumMap
-        timeseriesStatistics.addTimeseries(plan.getMeasurements().size());
+        schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
 
         List<Long> tagOffsets = plan.getTagOffsets();
         for (int i = 0; i < measurements.size(); i++) {
@@ -784,7 +784,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
 
     mNodeCache.invalidate(node.getPartialPath());
 
-    timeseriesStatistics.deleteTimeseries(1);
+    schemaStatisticsManager.deleteTimeseries(1);
     return storageGroupPath;
   }
   // endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
new file mode 100644
index 0000000000..44d5fc1c66
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.statistics;
+
+import org.apache.iotdb.commons.path.PartialPath;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+public class StatisticsManager {
+
+  private final Map<PartialPath, TimeseriesStats> seriesToStatsMap = Maps.newConcurrentMap();
+
+  public long getMaxBinarySizeInBytes(PartialPath path) {
+    return 512 * Byte.BYTES;
+  }
+
+  public static StatisticsManager getInstance() {
+    return StatisticsManager.StatisticsManagerHelper.INSTANCE;
+  }
+
+  private static class StatisticsManagerHelper {
+
+    private static final StatisticsManager INSTANCE = new StatisticsManager();
+
+    private StatisticsManagerHelper() {}
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
new file mode 100644
index 0000000000..509341d3d5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.statistics;
+
+public class TimeseriesStats {
+  // TODO collect time series statistics
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index b57ee14dcf..abc5ab3a4b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
@@ -44,9 +46,12 @@ import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOp
 import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -421,4 +426,64 @@ public class OperatorMemoryTest {
     assertEquals(2048 * 3, linearFillOperator.calculateMaxPeekMemory());
     assertEquals(1024, linearFillOperator.calculateMaxReturnSize());
   }
+
+  @Test
+  public void seriesAggregationScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      MeasurementPath measurementPath =
+          new MeasurementPath(
+              "root.SeriesAggregationScanOperatorTest.device0.sensor0", TSDataType.INT32);
+      Set<String> allSensors = Sets.newHashSet("sensor0");
+
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesAggregationScanOperatorTest.class.getSimpleName());
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator =
+          new SeriesAggregationScanOperator(
+              planNodeId,
+              measurementPath,
+              allSensors,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              Arrays.asList(
+                  new Aggregator(
+                      AccumulatorFactory.createAccumulator(
+                          AggregationType.COUNT, TSDataType.INT32, true),
+                      AggregationStep.SINGLE),
+                  new Aggregator(
+                      AccumulatorFactory.createAccumulator(
+                          AggregationType.MAX_VALUE, TSDataType.INT32, true),
+                      AggregationStep.SINGLE),
+                  new Aggregator(
+                      AccumulatorFactory.createAccumulator(
+                          AggregationType.MIN_TIME, TSDataType.INT32, true),
+                      AggregationStep.SINGLE)),
+              null,
+              true,
+              null);
+
+      assertEquals(
+          2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
+              + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+          seriesAggregationScanOperator.calculateMaxPeekMemory());
+      assertEquals(
+          2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
+              + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+          seriesAggregationScanOperator.calculateMaxReturnSize());
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
 }


[iotdb] 07/09: Merge github.com:apache/iotdb into MemoryControl

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f3983e0759c7b2ed50a60e11b7cbd83433085a15
Merge: eb0ff4978c f1787b83c5
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 23:22:02 2022 +0800

    Merge github.com:apache/iotdb into MemoryControl

 .github/workflows/sync.yml                         |  20 +-
 .../confignode/manager/ClusterSchemaManager.java   |   4 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  12 +-
 .../apache/iotdb/confignode/manager/IManager.java  |   9 +
 .../iotdb/confignode/manager/NodeManager.java      |  54 +-
 .../confignode/manager/PermissionManager.java      |   2 +-
 .../iotdb/confignode/manager/UDFManager.java       |   4 +-
 .../iotdb/confignode/manager/load/LoadManager.java |  16 +-
 .../manager/load/balancer/RegionBalancer.java      |   2 +-
 .../manager/load/balancer/RouteBalancer.java       |   2 +-
 .../iotdb/confignode/persistence/NodeInfo.java     |  16 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |   4 +-
 .../procedure/env/DataNodeRemoveHandler.java       |   6 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   3 +-
 .../iotdb/confignode/persistence/NodeInfoTest.java |   4 +-
 .../thrift/ConfigNodeRPCServiceProcessorTest.java  | 510 ----------------
 .../org/apache/iotdb/consensus/IStateMachine.java  |  15 +
 .../client/AsyncMultiLeaderServiceClient.java      |   5 +-
 .../client/MultiLeaderConsensusClientPool.java     |   5 +-
 integration-test/import-control.xml                |   4 +-
 .../org/apache/iotdb/db/it/IoTDBConfigNodeIT.java  | 639 +++++++++++++++++++++
 .../sync/IoTDBSyncReceiverCollectorIT.java         | 513 -----------------
 .../db/integration/sync/IoTDBSyncReceiverIT.java   | 200 +------
 .../db/integration/sync/IoTDBSyncSenderIT.java     |   2 +
 .../db/integration/sync/TransportClientMock.java   |   9 -
 .../db/integration/sync/TransportHandlerMock.java  |   3 -
 node-commons/pom.xml                               |   5 +
 .../commons/client/AsyncBaseClientFactory.java     |  24 +-
 .../iotdb/commons/client/ClientPoolFactory.java    |  15 +-
 .../AsyncConfigNodeHeartbeatServiceClient.java     |   5 +-
 .../async/AsyncConfigNodeIServiceClient.java       |   5 +-
 .../async/AsyncDataNodeHeartbeatServiceClient.java |   5 +-
 .../async/AsyncDataNodeInternalServiceClient.java  |   5 +-
 .../AsyncDataNodeMPPDataExchangeServiceClient.java |   5 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   3 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  13 +
 .../iotdb/commons/conf/CommonDescriptor.java       |   1 +
 .../apache/iotdb/commons/sync}/SyncConstant.java   |  22 +-
 .../apache/iotdb/commons/sync}/SyncPathUtil.java   |  24 +-
 .../iotdb/commons/client/ClientManagerTest.java    |   4 +-
 .../schemaregion/rocksdb/RSchemaRegion.java        |   8 +
 .../iotdb/db/client/DataNodeClientPoolFactory.java |  28 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  13 -
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   2 -
 .../statemachine/DataRegionStateMachine.java       |   6 +
 .../sync/PipeDataLoadBearableException.java        |  25 -
 .../sync/PipeDataLoadUnbearableException.java      |  25 -
 .../db/metadata/schemaregion/ISchemaRegion.java    |   6 +
 .../schemaregion/SchemaRegionMemoryImpl.java       |  45 ++
 .../schemaregion/SchemaRegionSchemaFileImpl.java   |   8 +
 .../mpp/common/schematree/ClusterSchemaTree.java   |  15 +-
 .../common/schematree/DeviceGroupSchemaTree.java   |  98 ++++
 .../db/mpp/common/schematree/DeviceSchemaInfo.java | 110 +++-
 .../common/schematree/MeasurementSchemaInfo.java   |  53 ++
 .../visitor/SchemaTreeDeviceVisitor.java           |  13 +-
 .../db/mpp/execution/exchange/SourceHandle.java    |   2 +-
 .../operator/process/FilterAndProjectOperator.java |  11 +-
 .../mpp/plan/analyze/StandaloneSchemaFetcher.java  | 231 ++------
 .../plan/expression/multi/FunctionExpression.java  |   3 +
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  56 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  55 +-
 .../iotdb/db/qp/physical/sys/CreatePipePlan.java   |   2 +-
 .../db/qp/physical/sys/CreatePipeSinkPlan.java     |   2 +-
 .../iotdb/db/sync/common/ISyncInfoFetcher.java     |  69 +++
 .../iotdb/db/sync/common/LocalSyncInfoFetcher.java | 186 ++++++
 .../org/apache/iotdb/db/sync/common/SyncInfo.java  | 292 ++++++++++
 .../db/sync/common/persistence/SyncLogReader.java  | 186 ++++++
 .../db/sync/common/persistence/SyncLogWriter.java  | 147 +++++
 .../iotdb/db/sync/datasource/AbstractOpBlock.java  |  16 +-
 .../iotdb/db/sync/datasource/DeletionGroup.java    | 242 ++++++++
 .../iotdb/db/sync/datasource/ModsfileOpBlock.java  |  53 --
 .../iotdb/db/sync/datasource/PipeOpManager.java    |   6 +-
 .../iotdb/db/sync/datasource/TsFileOpBlock.java    | 463 ++++++++++++---
 .../iotdb/db/sync/externalpipe/ExtPipePlugin.java  |  15 +-
 .../db/sync/externalpipe/ExtPipePluginManager.java |   4 +-
 .../iotdb/db/sync/pipedata/TsFilePipeData.java     |  14 +-
 .../sync/pipedata/queue/BufferedPipeDataQueue.java |   4 +-
 .../sync/pipedata/queue/PipeDataQueueFactory.java  |  57 --
 .../iotdb/db/sync/receiver/ReceiverService.java    | 195 +------
 .../db/sync/receiver/collector/Collector.java      | 171 ------
 .../db/sync/receiver/load/DeletionLoader.java      |   5 +-
 .../iotdb/db/sync/receiver/load/SchemaLoader.java  |   8 +-
 .../iotdb/db/sync/receiver/load/TsFileLoader.java  |   3 +-
 .../db/sync/receiver/manager/ReceiverManager.java  | 229 --------
 .../db/sync/receiver/recovery/ReceiverLog.java     | 127 ----
 .../receiver/recovery/ReceiverLogAnalyzer.java     | 157 -----
 .../iotdb/db/sync/sender/pipe/IoTDBPipeSink.java   |   2 +-
 .../org/apache/iotdb/db/sync/sender/pipe/Pipe.java |   4 -
 .../manager => sender/pipe}/PipeInfo.java          |  61 +-
 .../iotdb/db/sync/sender/pipe/TsFilePipe.java      |  14 +-
 .../iotdb/db/sync/sender/pipe/TsFilePipeInfo.java  |  63 ++
 .../db/sync/sender/recovery/SenderLogAnalyzer.java | 165 ------
 .../db/sync/sender/recovery/SenderLogger.java      | 141 -----
 .../db/sync/sender/recovery/TsFilePipeLogger.java  |   4 +-
 .../iotdb/db/sync/sender/service/MsgManager.java   | 114 ----
 .../db/sync/sender/service/SenderService.java      | 377 ++++++------
 .../db/sync/sender/service/TransportHandler.java   |  56 +-
 .../db/sync/transport/client/ClientWrapper.java    |   4 +-
 .../db/sync/transport/client/ITransportClient.java |  10 +-
 ...rtClient.java => IoTDBSInkTransportClient.java} | 148 +----
 .../db/sync/transport/conf/TransportConstant.java  |  36 --
 .../transport/server/TransportServiceImpl.java     |  48 +-
 .../apache/iotdb/db/utils/sync/SyncPipeUtil.java   |  96 ++++
 .../db/sync/datasource/DeletionGroupTest.java      | 231 ++++++++
 .../db/sync/datasource/PipeOpManagerTest.java      | 226 +++++++-
 .../db/sync/datasource/TsFileOpBlockTest.java      | 372 +++++++++++-
 .../sync/pipedata/BufferedPipeDataQueueTest.java   |   4 +-
 ...{ReceiverManagerTest.java => SyncInfoTest.java} |  71 +--
 ...ceiverLogAnalyzerTest.java => SyncLogTest.java} |  68 ++-
 .../db/sync/transport/TransportServiceTest.java    | 197 +++----
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |  13 +-
 .../datanode1conf/iotdb-datanode.properties        |   1 +
 .../datanode2conf/iotdb-datanode.properties        |   1 +
 .../datanode3conf/iotdb-datanode.properties        |   1 +
 thrift-sync/src/main/thrift/transport.thrift       |  27 -
 115 files changed, 4291 insertions(+), 3929 deletions(-)



[iotdb] 01/09: memory control for SeriesAggregationScanOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ad8467a3a8c664d8f5ca0de4327f7a8f33bdba13
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 11:21:33 2022 +0800

    memory control for SeriesAggregationScanOperator
---
 .../source/AbstractSeriesAggregationScanOperator.java  | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 2d7d671be9..a88fff89d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
@@ -42,6 +43,7 @@ import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendA
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public abstract class AbstractSeriesAggregationScanOperator implements DataSourceOperator {
 
@@ -68,6 +70,8 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
 
   protected boolean finished = false;
 
+  private final long maxReturnSize;
+
   public AbstractSeriesAggregationScanOperator(
       PlanNodeId sourceId,
       OperatorContext context,
@@ -91,6 +95,10 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
+    this.maxReturnSize =
+        (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
+            + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
 
   @Override
@@ -108,6 +116,16 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
     seriesScanUtil.initQueryDataSource(dataSource);
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
   @Override
   public boolean hasNext() {
     return timeRangeIterator.hasNextTimeRange();


[iotdb] 04/09: memory control for AggregationOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8e7c5f6a49f2285e711509d6247884da45db8d28
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 11:41:04 2022 +0800

    memory control for AggregationOperator
---
 .../execution/operator/process/AggregationOperator.java | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 8cf4aef8b0..92da980d1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
 import static com.google.common.util.concurrent.Futures.successfulAsList;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 /**
  * AggregationOperator can process the situation: aggregation of intermediate aggregate result, it
@@ -95,6 +96,22 @@ public class AggregationOperator implements ProcessOperator {
     return operatorContext;
   }
 
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = calculateMaxReturnSize();
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      maxPeekMemory += child.calculateMaxReturnSize();
+      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory());
+    }
+    return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return (1L + inputOperatorsCount) * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
   @Override
   public ListenableFuture<?> isBlocked() {
     List<ListenableFuture<?>> listenableFutures = new ArrayList<>();


[iotdb] 09/09: temp save

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e64b5c567905c3996f44b9bd5b671e39bf8a7185
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Aug 11 20:27:22 2022 +0800

    temp save
---
 .../aggregation/timerangeiterator/AggrWindowIterator.java  |  5 +++++
 .../aggregation/timerangeiterator/ITimeRangeIterator.java  |  2 ++
 .../timerangeiterator/SingleTimeWindowIterator.java        |  5 +++++
 .../source/AbstractSeriesAggregationScanOperator.java      | 14 ++++++++++----
 4 files changed, 22 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
index f27c56223c..98dd32abc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/AggrWindowIterator.java
@@ -176,4 +176,9 @@ public class AggrWindowIterator implements ITimeRangeIterator {
   public long currentOutputTime() {
     return leftCRightO ? curTimeRange.getMin() : curTimeRange.getMax();
   }
+
+  @Override
+  public int getTotalTimeRangeNum() {
+    return 0;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
index ccc999e810..76161dd684 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/ITimeRangeIterator.java
@@ -55,4 +55,6 @@ public interface ITimeRangeIterator {
    * @return minTime if leftCloseRightOpen, else maxTime.
    */
   long currentOutputTime();
+
+  int getTotalTimeRangeNum();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
index b998557604..291e4f3b4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SingleTimeWindowIterator.java
@@ -71,4 +71,9 @@ public class SingleTimeWindowIterator implements ITimeRangeIterator {
   public long currentOutputTime() {
     return curTimeRange.getMin();
   }
+
+  @Override
+  public int getTotalTimeRangeNum() {
+    return 1;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index a88fff89d5..fee5102441 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -70,6 +70,7 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
 
   protected boolean finished = false;
 
+  private final long maxRetainedSize;
   private final long maxReturnSize;
 
   public AbstractSeriesAggregationScanOperator(
@@ -96,9 +97,9 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
     }
     this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
 
-    this.maxReturnSize =
-        (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
-            + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    this.maxRetainedSize =
+        (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    this.maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
 
   @Override
@@ -118,7 +119,7 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
 
   @Override
   public long calculateMaxPeekMemory() {
-    return maxReturnSize;
+    return maxRetainedSize + maxReturnSize;
   }
 
   @Override
@@ -126,6 +127,11 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
     return maxReturnSize;
   }
 
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return maxRetainedSize;
+  }
+
   @Override
   public boolean hasNext() {
     return timeRangeIterator.hasNextTimeRange();