You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/08/11 12:28:30 UTC
[iotdb] 06/09: getMaxBinarySizeInBytes by empty stats impl
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit eb0ff4978c70109c9c80fde8fd46071ccd63f913
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Aug 10 23:14:41 2022 +0800
getMaxBinarySizeInBytes by empty stats impl
---
.../iotdb/db/metadata/LocalSchemaProcessor.java | 6 +-
.../db/metadata/rescon/SchemaResourceManager.java | 4 +-
...tatistics.java => SchemaStatisticsManager.java} | 12 ++--
.../schemaregion/SchemaRegionMemoryImpl.java | 12 ++--
.../schemaregion/SchemaRegionSchemaFileImpl.java | 12 ++--
.../iotdb/db/mpp/statistics/StatisticsManager.java | 46 +++++++++++++++
.../iotdb/db/mpp/statistics/TimeseriesStats.java | 24 ++++++++
.../mpp/execution/operator/OperatorMemoryTest.java | 65 ++++++++++++++++++++++
8 files changed, 158 insertions(+), 23 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index 32518a6239..fafd13ddd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.Template;
@@ -466,7 +466,7 @@ public class LocalSchemaProcessor {
// todo this is for test assistance, refactor this to support massive timeseries
if (pathPattern.getFullPath().equals("root.**")
&& TemplateManager.getInstance().getAllTemplateName().isEmpty()) {
- return (int) TimeseriesStatistics.getInstance().getTotalSeriesNumber();
+ return (int) SchemaStatisticsManager.getInstance().getTotalSeriesNumber();
}
int count = 0;
for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
@@ -1380,7 +1380,7 @@ public class LocalSchemaProcessor {
@TestOnly
public long getTotalSeriesNumber() {
- return TimeseriesStatistics.getInstance().getTotalSeriesNumber();
+ return SchemaStatisticsManager.getInstance().getTotalSeriesNumber();
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
index 0fe18ac955..c5df8ff9a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
@@ -30,7 +30,7 @@ public class SchemaResourceManager {
private SchemaResourceManager() {}
public static void initSchemaResource() {
- TimeseriesStatistics.getInstance().init();
+ SchemaStatisticsManager.getInstance().init();
MemoryStatistics.getInstance().init();
if (IoTDBDescriptor.getInstance()
.getConfig()
@@ -41,7 +41,7 @@ public class SchemaResourceManager {
}
public static void clearSchemaResource() {
- TimeseriesStatistics.getInstance().clear();
+ SchemaStatisticsManager.getInstance().clear();
MemoryStatistics.getInstance().clear();
if (IoTDBDescriptor.getInstance()
.getConfig()
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
similarity index 87%
rename from server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
rename to server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
index 097d1accce..f79e3f7a20 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
@@ -26,22 +26,22 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import java.util.concurrent.atomic.AtomicLong;
-public class TimeseriesStatistics {
+public class SchemaStatisticsManager {
private final AtomicLong totalSeriesNumber = new AtomicLong();
- private static class TimeseriesStatisticsHolder {
+ private static class SchemaStatisticsHolder {
- private TimeseriesStatisticsHolder() {
+ private SchemaStatisticsHolder() {
// allowed to do nothing
}
- private static final TimeseriesStatistics INSTANCE = new TimeseriesStatistics();
+ private static final SchemaStatisticsManager INSTANCE = new SchemaStatisticsManager();
}
/** we should not use this function in other place, but only in IoTDB class */
- public static TimeseriesStatistics getInstance() {
- return TimeseriesStatisticsHolder.INSTANCE;
+ public static SchemaStatisticsManager getInstance() {
+ return SchemaStatisticsHolder.INSTANCE;
}
public void init() {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 78754414eb..1cef2e826e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -52,7 +52,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGMemoryImpl;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
import org.apache.iotdb.db.metadata.tag.TagManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.TemplateManager;
@@ -165,7 +165,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
private boolean usingMLog = true;
private MLogWriter logWriter;
- private TimeseriesStatistics timeseriesStatistics = TimeseriesStatistics.getInstance();
+ private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance();
private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
private final IStorageGroupMNode storageGroupMNode;
@@ -451,7 +451,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
// collect all the LeafMNode in this schema region
List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
- timeseriesStatistics.deleteTimeseries(leafMNodes.size());
+ schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
// drop triggers with no exceptions
TriggerEngine.drop(leafMNodes);
@@ -598,7 +598,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
mNodeCache.invalidate(path.getDevicePath());
// update statistics and schemaDataTypeNumMap
- timeseriesStatistics.addTimeseries(1);
+ schemaStatisticsManager.addTimeseries(1);
// update tag index
if (offset != -1 && isRecovering) {
@@ -715,7 +715,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
mNodeCache.invalidate(prefixPath);
// update statistics and schemaDataTypeNumMap
- timeseriesStatistics.addTimeseries(plan.getMeasurements().size());
+ schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
List<Long> tagOffsets = plan.getTagOffsets();
for (int i = 0; i < measurements.size(); i++) {
@@ -857,7 +857,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
mNodeCache.invalidate(node.getPartialPath());
- timeseriesStatistics.deleteTimeseries(1);
+ schemaStatisticsManager.deleteTimeseries(1);
return storageGroupPath;
}
// endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index a49e3767de..0602891c67 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -50,7 +50,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGCachedImpl;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
import org.apache.iotdb.db.metadata.tag.TagManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.TemplateManager;
@@ -161,7 +161,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
private File logFile;
private MLogWriter logWriter;
- private TimeseriesStatistics timeseriesStatistics = TimeseriesStatistics.getInstance();
+ private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance();
private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
private final IStorageGroupMNode storageGroupMNode;
@@ -412,7 +412,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
// collect all the LeafMNode in this schema region
List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
- timeseriesStatistics.deleteTimeseries(leafMNodes.size());
+ schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
// drop triggers with no exceptions
TriggerEngine.drop(leafMNodes);
@@ -494,7 +494,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
mNodeCache.invalidate(path.getDevicePath());
// update statistics and schemaDataTypeNumMap
- timeseriesStatistics.addTimeseries(1);
+ schemaStatisticsManager.addTimeseries(1);
// update tag index
if (offset != -1 && isRecovering) {
@@ -636,7 +636,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
mNodeCache.invalidate(prefixPath);
// update statistics and schemaDataTypeNumMap
- timeseriesStatistics.addTimeseries(plan.getMeasurements().size());
+ schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
List<Long> tagOffsets = plan.getTagOffsets();
for (int i = 0; i < measurements.size(); i++) {
@@ -784,7 +784,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
mNodeCache.invalidate(node.getPartialPath());
- timeseriesStatistics.deleteTimeseries(1);
+ schemaStatisticsManager.deleteTimeseries(1);
return storageGroupPath;
}
// endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
new file mode 100644
index 0000000000..44d5fc1c66
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.statistics;
+
+import org.apache.iotdb.commons.path.PartialPath;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+public class StatisticsManager {
+
+ private final Map<PartialPath, TimeseriesStats> seriesToStatsMap = Maps.newConcurrentMap();
+
+ public long getMaxBinarySizeInBytes(PartialPath path) {
+ return 512 * Byte.BYTES;
+ }
+
+ public static StatisticsManager getInstance() {
+ return StatisticsManager.StatisticsManagerHelper.INSTANCE;
+ }
+
+ private static class StatisticsManagerHelper {
+
+ private static final StatisticsManager INSTANCE = new StatisticsManager();
+
+ private StatisticsManagerHelper() {}
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
new file mode 100644
index 0000000000..509341d3d5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.statistics;
+
+public class TimeseriesStats {
+ // TODO collect time series statistics
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index b57ee14dcf..abc5ab3a4b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
@@ -44,9 +46,12 @@ import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOp
import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -421,4 +426,64 @@ public class OperatorMemoryTest {
assertEquals(2048 * 3, linearFillOperator.calculateMaxPeekMemory());
assertEquals(1024, linearFillOperator.calculateMaxReturnSize());
}
+
+ @Test
+ public void seriesAggregationScanOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ MeasurementPath measurementPath =
+ new MeasurementPath(
+ "root.SeriesAggregationScanOperatorTest.device0.sensor0", TSDataType.INT32);
+ Set<String> allSensors = Sets.newHashSet("sensor0");
+
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId, SeriesAggregationScanOperatorTest.class.getSimpleName());
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator =
+ new SeriesAggregationScanOperator(
+ planNodeId,
+ measurementPath,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ Arrays.asList(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ AggregationType.COUNT, TSDataType.INT32, true),
+ AggregationStep.SINGLE),
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ AggregationType.MAX_VALUE, TSDataType.INT32, true),
+ AggregationStep.SINGLE),
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(
+ AggregationType.MIN_TIME, TSDataType.INT32, true),
+ AggregationStep.SINGLE)),
+ null,
+ true,
+ null);
+
+ assertEquals(
+ 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
+ + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ seriesAggregationScanOperator.calculateMaxPeekMemory());
+ assertEquals(
+ 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
+ + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ seriesAggregationScanOperator.calculateMaxReturnSize());
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
}