You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/08/11 12:28:30 UTC

[iotdb] 06/09: getMaxBinarySizeInBytes by empty stats impl

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit eb0ff4978c70109c9c80fde8fd46071ccd63f913
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 23:14:41 2022 +0800

    getMaxBinarySizeInBytes by empty stats impl
---
 .../iotdb/db/metadata/LocalSchemaProcessor.java    |  6 +-
 .../db/metadata/rescon/SchemaResourceManager.java  |  4 +-
 ...tatistics.java => SchemaStatisticsManager.java} | 12 ++--
 .../schemaregion/SchemaRegionMemoryImpl.java       | 12 ++--
 .../schemaregion/SchemaRegionSchemaFileImpl.java   | 12 ++--
 .../iotdb/db/mpp/statistics/StatisticsManager.java | 46 +++++++++++++++
 .../iotdb/db/mpp/statistics/TimeseriesStats.java   | 24 ++++++++
 .../mpp/execution/operator/OperatorMemoryTest.java | 65 ++++++++++++++++++++++
 8 files changed, 158 insertions(+), 23 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index 32518a6239..fafd13ddd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.metadata.mnode.IMNode;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.template.Template;
@@ -466,7 +466,7 @@ public class LocalSchemaProcessor {
     // todo this is for test assistance, refactor this to support massive timeseries
     if (pathPattern.getFullPath().equals("root.**")
         && TemplateManager.getInstance().getAllTemplateName().isEmpty()) {
-      return (int) TimeseriesStatistics.getInstance().getTotalSeriesNumber();
+      return (int) SchemaStatisticsManager.getInstance().getTotalSeriesNumber();
     }
     int count = 0;
     for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
@@ -1380,7 +1380,7 @@ public class LocalSchemaProcessor {
 
   @TestOnly
   public long getTotalSeriesNumber() {
-    return TimeseriesStatistics.getInstance().getTotalSeriesNumber();
+    return SchemaStatisticsManager.getInstance().getTotalSeriesNumber();
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
index 0fe18ac955..c5df8ff9a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
@@ -30,7 +30,7 @@ public class SchemaResourceManager {
   private SchemaResourceManager() {}
 
   public static void initSchemaResource() {
-    TimeseriesStatistics.getInstance().init();
+    SchemaStatisticsManager.getInstance().init();
     MemoryStatistics.getInstance().init();
     if (IoTDBDescriptor.getInstance()
         .getConfig()
@@ -41,7 +41,7 @@ public class SchemaResourceManager {
   }
 
   public static void clearSchemaResource() {
-    TimeseriesStatistics.getInstance().clear();
+    SchemaStatisticsManager.getInstance().clear();
     MemoryStatistics.getInstance().clear();
     if (IoTDBDescriptor.getInstance()
         .getConfig()
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
similarity index 87%
rename from server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
rename to server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
index 097d1accce..f79e3f7a20 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
@@ -26,22 +26,22 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 
 import java.util.concurrent.atomic.AtomicLong;
 
-public class TimeseriesStatistics {
+public class SchemaStatisticsManager {
 
   private final AtomicLong totalSeriesNumber = new AtomicLong();
 
-  private static class TimeseriesStatisticsHolder {
+  private static class SchemaStatisticsHolder {
 
-    private TimeseriesStatisticsHolder() {
+    private SchemaStatisticsHolder() {
       // allowed to do nothing
     }
 
-    private static final TimeseriesStatistics INSTANCE = new TimeseriesStatistics();
+    private static final SchemaStatisticsManager INSTANCE = new SchemaStatisticsManager();
   }
 
   /** we should not use this function in other place, but only in IoTDB class */
-  public static TimeseriesStatistics getInstance() {
-    return TimeseriesStatisticsHolder.INSTANCE;
+  public static SchemaStatisticsManager getInstance() {
+    return SchemaStatisticsHolder.INSTANCE;
   }
 
   public void init() {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 78754414eb..1cef2e826e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -52,7 +52,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGMemoryImpl;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
 import org.apache.iotdb.db.metadata.tag.TagManager;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.template.TemplateManager;
@@ -165,7 +165,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   private boolean usingMLog = true;
   private MLogWriter logWriter;
 
-  private TimeseriesStatistics timeseriesStatistics = TimeseriesStatistics.getInstance();
+  private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance();
   private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
 
   private final IStorageGroupMNode storageGroupMNode;
@@ -451,7 +451,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     // collect all the LeafMNode in this schema region
     List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
 
-    timeseriesStatistics.deleteTimeseries(leafMNodes.size());
+    schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
 
     // drop triggers with no exceptions
     TriggerEngine.drop(leafMNodes);
@@ -598,7 +598,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       mNodeCache.invalidate(path.getDevicePath());
 
       // update statistics and schemaDataTypeNumMap
-      timeseriesStatistics.addTimeseries(1);
+      schemaStatisticsManager.addTimeseries(1);
 
       // update tag index
       if (offset != -1 && isRecovering) {
@@ -715,7 +715,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       mNodeCache.invalidate(prefixPath);
 
       // update statistics and schemaDataTypeNumMap
-      timeseriesStatistics.addTimeseries(plan.getMeasurements().size());
+      schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
 
       List<Long> tagOffsets = plan.getTagOffsets();
       for (int i = 0; i < measurements.size(); i++) {
@@ -857,7 +857,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
 
     mNodeCache.invalidate(node.getPartialPath());
 
-    timeseriesStatistics.deleteTimeseries(1);
+    schemaStatisticsManager.deleteTimeseries(1);
     return storageGroupPath;
   }
   // endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index a49e3767de..0602891c67 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -50,7 +50,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGCachedImpl;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
 import org.apache.iotdb.db.metadata.tag.TagManager;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.template.TemplateManager;
@@ -161,7 +161,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
   private File logFile;
   private MLogWriter logWriter;
 
-  private TimeseriesStatistics timeseriesStatistics = TimeseriesStatistics.getInstance();
+  private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance();
   private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
 
   private final IStorageGroupMNode storageGroupMNode;
@@ -412,7 +412,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     // collect all the LeafMNode in this schema region
     List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
 
-    timeseriesStatistics.deleteTimeseries(leafMNodes.size());
+    schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
 
     // drop triggers with no exceptions
     TriggerEngine.drop(leafMNodes);
@@ -494,7 +494,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
         mNodeCache.invalidate(path.getDevicePath());
 
         // update statistics and schemaDataTypeNumMap
-        timeseriesStatistics.addTimeseries(1);
+        schemaStatisticsManager.addTimeseries(1);
 
         // update tag index
         if (offset != -1 && isRecovering) {
@@ -636,7 +636,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
         mNodeCache.invalidate(prefixPath);
 
         // update statistics and schemaDataTypeNumMap
-        timeseriesStatistics.addTimeseries(plan.getMeasurements().size());
+        schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
 
         List<Long> tagOffsets = plan.getTagOffsets();
         for (int i = 0; i < measurements.size(); i++) {
@@ -784,7 +784,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
 
     mNodeCache.invalidate(node.getPartialPath());
 
-    timeseriesStatistics.deleteTimeseries(1);
+    schemaStatisticsManager.deleteTimeseries(1);
     return storageGroupPath;
   }
   // endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
new file mode 100644
index 0000000000..44d5fc1c66
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.statistics;
+
+import org.apache.iotdb.commons.path.PartialPath;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+public class StatisticsManager {
+
+  private final Map<PartialPath, TimeseriesStats> seriesToStatsMap = Maps.newConcurrentMap();
+
+  public long getMaxBinarySizeInBytes(PartialPath path) {
+    return 512 * Byte.BYTES;
+  }
+
+  public static StatisticsManager getInstance() {
+    return StatisticsManager.StatisticsManagerHelper.INSTANCE;
+  }
+
+  private static class StatisticsManagerHelper {
+
+    private static final StatisticsManager INSTANCE = new StatisticsManager();
+
+    private StatisticsManagerHelper() {}
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
new file mode 100644
index 0000000000..509341d3d5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.statistics;
+
+public class TimeseriesStats {
+  // TODO collect time series statistics
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index b57ee14dcf..abc5ab3a4b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
@@ -44,9 +46,12 @@ import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOp
 import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -421,4 +426,64 @@ public class OperatorMemoryTest {
     assertEquals(2048 * 3, linearFillOperator.calculateMaxPeekMemory());
     assertEquals(1024, linearFillOperator.calculateMaxReturnSize());
   }
+
+  @Test
+  public void seriesAggregationScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      MeasurementPath measurementPath =
+          new MeasurementPath(
+              "root.SeriesAggregationScanOperatorTest.device0.sensor0", TSDataType.INT32);
+      Set<String> allSensors = Sets.newHashSet("sensor0");
+
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesAggregationScanOperatorTest.class.getSimpleName());
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator =
+          new SeriesAggregationScanOperator(
+              planNodeId,
+              measurementPath,
+              allSensors,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              Arrays.asList(
+                  new Aggregator(
+                      AccumulatorFactory.createAccumulator(
+                          AggregationType.COUNT, TSDataType.INT32, true),
+                      AggregationStep.SINGLE),
+                  new Aggregator(
+                      AccumulatorFactory.createAccumulator(
+                          AggregationType.MAX_VALUE, TSDataType.INT32, true),
+                      AggregationStep.SINGLE),
+                  new Aggregator(
+                      AccumulatorFactory.createAccumulator(
+                          AggregationType.MIN_TIME, TSDataType.INT32, true),
+                      AggregationStep.SINGLE)),
+              null,
+              true,
+              null);
+
+      assertEquals(
+          2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
+              + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+          seriesAggregationScanOperator.calculateMaxPeekMemory());
+      assertEquals(
+          2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
+              + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+          seriesAggregationScanOperator.calculateMaxReturnSize());
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
 }