You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/02/16 14:31:49 UTC
[iotdb] branch master updated: [IOTDB-5528] Refactor schema engine and region statistics (#9052)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1a69b40c4c [IOTDB-5528] Refactor schema engine and region statistics (#9052)
1a69b40c4c is described below
commit 1a69b40c4c61d08a3ac3f4292140b3da584d9986
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Thu Feb 16 22:31:41 2023 +0800
[IOTDB-5528] Refactor schema engine and region statistics (#9052)
---
.../schemaregion/rocksdb/RSchemaRegion.java | 6 +
.../metadata/tagSchemaRegion/TagSchemaRegion.java | 6 +
.../db/metadata/mtree/MTreeBelowSGCachedImpl.java | 41 +---
.../db/metadata/mtree/MTreeBelowSGMemoryImpl.java | 42 +---
.../mtree/snapshot/MemMTreeSnapshotUtil.java | 45 +++-
.../db/metadata/mtree/store/CachedMTreeStore.java | 76 +++++--
.../iotdb/db/metadata/mtree/store/IMTreeStore.java | 8 +
.../db/metadata/mtree/store/MemMTreeStore.java | 66 ++++--
.../store/ReentrantReadOnlyCachedMTreeStore.java | 6 +
.../mtree/store/disk/cache/CacheManager.java | 11 +-
.../mtree/store/disk/cache/CacheMemoryManager.java | 54 ++++-
.../mtree/store/disk/cache/LRUCacheManager.java | 6 +-
.../mtree/store/disk/cache/PlainCacheManager.java | 5 +
...IMemManager.java => IReleaseFlushStrategy.java} | 32 +--
.../mtree/store/disk/memcontrol/MemManager.java | 86 +++++++
.../MemManagerNodeEstimatedSizeBasedImpl.java | 133 -----------
.../memcontrol/MemManagerNodeNumBasedImpl.java | 105 ---------
....java => ReleaseFlushStrategyNumBasedImpl.java} | 29 ++-
.../ReleaseFlushStrategySizeBasedImpl.java | 51 +++++
.../rescon/CachedSchemaEngineStatistics.java | 84 +++++++
.../rescon/CachedSchemaRegionStatistics.java | 94 ++++++++
.../ISchemaEngineStatistics.java} | 26 +--
.../ISchemaRegionStatistics.java} | 32 +--
...tistics.java => MemSchemaEngineStatistics.java} | 68 +++---
.../metadata/rescon/MemSchemaRegionStatistics.java | 93 ++++++++
.../SchemaEngineStatisticsHolder.java} | 24 +-
.../db/metadata/rescon/SchemaResourceManager.java | 10 +-
.../rescon/SchemaResourceManagerMetrics.java | 28 +--
.../metadata/rescon/SchemaStatisticsManager.java | 56 -----
.../db/metadata/schemaregion/ISchemaRegion.java | 5 +
.../db/metadata/schemaregion/SchemaEngine.java | 6 +-
.../schemaregion/SchemaRegionMemoryImpl.java | 33 +--
.../schemaregion/SchemaRegionSchemaFileImpl.java | 39 ++--
.../iotdb/db/metadata/mtree/MTreeBelowSGTest.java | 9 +-
.../schemaRegion/SchemaStatisticsTest.java | 252 +++++++++++++++++++++
35 files changed, 1074 insertions(+), 593 deletions(-)
diff --git a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 0887f34850..db7ffaa4f4 100644
--- a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.db.metadata.query.info.IDeviceSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
+import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaRegionUtils;
import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeType;
@@ -189,6 +190,11 @@ public class RSchemaRegion implements ISchemaRegion {
// do nothing
}
+ @Override
+ public MemSchemaRegionStatistics getSchemaRegionStatistics() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public SchemaRegionId getSchemaRegionId() {
return schemaRegionId;
diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
index 2624de345e..397bdb4e49 100644
--- a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
+++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.db.metadata.query.info.IDeviceSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
+import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaRegionUtils;
import org.apache.iotdb.db.metadata.tagSchemaRegion.idtable.IDTableWithDeviceIDListImpl;
@@ -154,6 +155,11 @@ public class TagSchemaRegion implements ISchemaRegion {
// no need to record mlog
}
+ @Override
+ public MemSchemaRegionStatistics getSchemaRegionStatistics() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public SchemaRegionId getSchemaRegionId() {
return schemaRegionId;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index a0bb4635a2..252110ad05 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.exception.metadata.template.DifferentTemplateExceptio
import org.apache.iotdb.db.exception.metadata.template.TemplateImcompatibeException;
import org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
import org.apache.iotdb.db.metadata.MetadataConstant;
-import org.apache.iotdb.db.metadata.mnode.AboveDatabaseMNode;
import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
@@ -61,7 +60,7 @@ import org.apache.iotdb.db.metadata.query.info.IDeviceSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
-import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
+import org.apache.iotdb.db.metadata.rescon.CachedSchemaRegionStatistics;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -116,13 +115,14 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
Function<IMeasurementMNode, Map<String, String>> tagGetter,
Runnable flushCallback,
Consumer<IMeasurementMNode> measurementProcess,
- int schemaRegionId)
+ int schemaRegionId,
+ CachedSchemaRegionStatistics regionStatistics)
throws MetadataException, IOException {
this.tagGetter = tagGetter;
- store = new CachedMTreeStore(storageGroupPath, schemaRegionId, flushCallback);
+ store = new CachedMTreeStore(storageGroupPath, schemaRegionId, regionStatistics, flushCallback);
this.storageGroupMNode = store.getRoot().getAsStorageGroupMNode();
this.storageGroupMNode.setParent(storageGroupMNode.getParent());
- this.rootNode = generatePrefix(storageGroupPath, this.storageGroupMNode);
+ this.rootNode = store.generatePrefix(storageGroupPath);
levelOfSG = storageGroupPath.getNodeLength() - 1;
// recover measurement
@@ -132,7 +132,7 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
@Override
protected Void collectMeasurement(IMeasurementMNode node) {
measurementProcess.accept(node);
- SchemaStatisticsManager.getInstance().addTimeseries(1);
+ regionStatistics.addTimeseries(1L);
return null;
}
}) {
@@ -140,28 +140,6 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
}
}
- /**
- * Generate the ancestor nodes of storageGroupNode
- *
- * @return root node
- */
- private IMNode generatePrefix(
- PartialPath storageGroupPath, IStorageGroupMNode storageGroupMNode) {
- String[] nodes = storageGroupPath.getNodes();
- // nodes[0] must be root
- IMNode root = new AboveDatabaseMNode(null, nodes[0]);
- IMNode cur = root;
- IMNode child;
- for (int i = 1; i < nodes.length - 1; i++) {
- child = new AboveDatabaseMNode(cur, nodes[i]);
- cur.addChild(nodes[i], child);
- cur = child;
- }
- storageGroupMNode.setParent(cur);
- cur.addChild(storageGroupMNode);
- return root;
- }
-
/** Only used for load snapshot */
private MTreeBelowSGCachedImpl(
PartialPath storageGroupPath,
@@ -171,7 +149,7 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
throws MetadataException {
this.store = store;
this.storageGroupMNode = store.getRoot().getAsStorageGroupMNode();
- this.rootNode = generatePrefix(storageGroupPath, this.storageGroupMNode);
+ this.rootNode = store.generatePrefix(storageGroupPath);
levelOfSG = storageGroupMNode.getPartialPath().getNodeLength() - 1;
this.tagGetter = tagGetter;
@@ -209,6 +187,7 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
File snapshotDir,
String storageGroupFullPath,
int schemaRegionId,
+ CachedSchemaRegionStatistics regionStatistics,
Consumer<IMeasurementMNode> measurementProcess,
Function<IMeasurementMNode, Map<String, String>> tagGetter,
Runnable flushCallback)
@@ -216,7 +195,7 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
return new MTreeBelowSGCachedImpl(
new PartialPath(storageGroupFullPath),
CachedMTreeStore.loadFromSnapshot(
- snapshotDir, storageGroupFullPath, schemaRegionId, flushCallback),
+ snapshotDir, storageGroupFullPath, schemaRegionId, regionStatistics, flushCallback),
measurementProcess,
tagGetter);
}
@@ -1038,8 +1017,6 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
rootNode, showTimeSeriesPlan.getPath(), store, showTimeSeriesPlan.isPrefixMatch()) {
@Override
protected ITimeSeriesSchemaInfo collectMeasurement(IMeasurementMNode node) {
- Pair<Map<String, String>, Map<String, String>> tagAndAttribute =
- tagAndAttributeProvider.apply(node.getOffset());
return new ITimeSeriesSchemaInfo() {
private Pair<Map<String, String>, Map<String, String>> tagAndAttribute = null;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index e17f76d52b..ad834af5b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.exception.metadata.template.DifferentTemplateExceptio
import org.apache.iotdb.db.exception.metadata.template.TemplateImcompatibeException;
import org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
import org.apache.iotdb.db.metadata.MetadataConstant;
-import org.apache.iotdb.db.metadata.mnode.AboveDatabaseMNode;
import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
@@ -61,6 +60,7 @@ import org.apache.iotdb.db.metadata.query.info.IDeviceSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
+import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -114,10 +114,10 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
public MTreeBelowSGMemoryImpl(
PartialPath storageGroupPath,
Function<IMeasurementMNode, Map<String, String>> tagGetter,
- int schemaRegionId) {
- store = new MemMTreeStore(storageGroupPath, true);
+ MemSchemaRegionStatistics regionStatistics) {
+ store = new MemMTreeStore(storageGroupPath, true, regionStatistics);
this.storageGroupMNode = store.getRoot().getAsStorageGroupMNode();
- this.rootNode = generatePrefix(storageGroupPath, this.storageGroupMNode);
+ this.rootNode = store.generatePrefix(storageGroupPath);
levelOfSG = storageGroupPath.getNodeLength() - 1;
this.tagGetter = tagGetter;
}
@@ -125,37 +125,14 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
private MTreeBelowSGMemoryImpl(
PartialPath storageGroupPath,
MemMTreeStore store,
- Function<IMeasurementMNode, Map<String, String>> tagGetter,
- int schemaRegionId) {
+ Function<IMeasurementMNode, Map<String, String>> tagGetter) {
this.store = store;
this.storageGroupMNode = store.getRoot().getAsStorageGroupMNode();
- this.rootNode = generatePrefix(storageGroupPath, this.storageGroupMNode);
+ this.rootNode = store.generatePrefix(storageGroupPath);
levelOfSG = storageGroupPath.getNodeLength() - 1;
this.tagGetter = tagGetter;
}
- /**
- * Generate the ancestor nodes of storageGroupNode
- *
- * @return root node
- */
- private IMNode generatePrefix(
- PartialPath storageGroupPath, IStorageGroupMNode storageGroupMNode) {
- String[] nodes = storageGroupPath.getNodes();
- // nodes[0] must be root
- IMNode root = new AboveDatabaseMNode(null, nodes[0]);
- IMNode cur = root;
- IMNode child;
- for (int i = 1; i < nodes.length - 1; i++) {
- child = new AboveDatabaseMNode(cur, nodes[i]);
- cur.addChild(nodes[i], child);
- cur = child;
- }
- storageGroupMNode.setParent(cur);
- cur.addChild(storageGroupMNode);
- return root;
- }
-
@Override
public void clear() {
store.clear();
@@ -175,15 +152,14 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
public static MTreeBelowSGMemoryImpl loadFromSnapshot(
File snapshotDir,
String storageGroupFullPath,
- int schemaRegionId,
+ MemSchemaRegionStatistics regionStatistics,
Consumer<IMeasurementMNode> measurementProcess,
Function<IMeasurementMNode, Map<String, String>> tagGetter)
throws IOException, IllegalPathException {
return new MTreeBelowSGMemoryImpl(
new PartialPath(storageGroupFullPath),
- MemMTreeStore.loadFromSnapshot(snapshotDir, measurementProcess),
- tagGetter,
- schemaRegionId);
+ MemMTreeStore.loadFromSnapshot(snapshotDir, measurementProcess, regionStatistics),
+ tagGetter);
}
// endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/snapshot/MemMTreeSnapshotUtil.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/snapshot/MemMTreeSnapshotUtil.java
index 6306d8eaeb..b650dc710c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/snapshot/MemMTreeSnapshotUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/snapshot/MemMTreeSnapshotUtil.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.db.metadata.mnode.estimator.IMNodeSizeEstimator;
import org.apache.iotdb.db.metadata.mnode.iterator.IMNodeIterator;
import org.apache.iotdb.db.metadata.mnode.visitor.MNodeVisitor;
import org.apache.iotdb.db.metadata.mtree.store.MemMTreeStore;
-import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
+import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -67,7 +67,6 @@ public class MemMTreeSnapshotUtil {
"Error occurred during deserializing MemMTree.";
private static final byte VERSION = 0;
- private static final MemoryStatistics MEMORY_STATISTICS = MemoryStatistics.getInstance();
private static final IMNodeSizeEstimator ESTIMATOR = new BasicMNodSizeEstimator();
public static boolean createSnapshot(File snapshotDir, MemMTreeStore store) {
@@ -106,15 +105,18 @@ public class MemMTreeSnapshotUtil {
}
public static IMNode loadSnapshot(
- File snapshotDir, Consumer<IMeasurementMNode> measurementProcess) throws IOException {
+ File snapshotDir,
+ Consumer<IMeasurementMNode> measurementProcess,
+ MemSchemaRegionStatistics regionStatistics)
+ throws IOException {
File snapshot =
SystemFileFactory.INSTANCE.getFile(snapshotDir, MetadataConstant.MTREE_SNAPSHOT);
try (BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(snapshot))) {
- return deserializeFrom(inputStream, measurementProcess);
+ return deserializeFrom(inputStream, measurementProcess, regionStatistics);
} catch (Throwable e) {
// This method is only invoked during recovery. If failed, the memory usage should be cleared
// since the loaded schema will not be used.
- MEMORY_STATISTICS.clear();
+ regionStatistics.clear();
throw e;
}
}
@@ -153,17 +155,29 @@ public class MemMTreeSnapshotUtil {
}
private static IMNode deserializeFrom(
- InputStream inputStream, Consumer<IMeasurementMNode> measurementProcess) throws IOException {
+ InputStream inputStream,
+ Consumer<IMeasurementMNode> measurementProcess,
+ MemSchemaRegionStatistics regionStatistics)
+ throws IOException {
byte version = ReadWriteIOUtils.readByte(inputStream);
- return inorderDeserialize(inputStream, measurementProcess);
+ return inorderDeserialize(inputStream, measurementProcess, regionStatistics);
}
private static IMNode inorderDeserialize(
- InputStream inputStream, Consumer<IMeasurementMNode> measurementProcess) throws IOException {
+ InputStream inputStream,
+ Consumer<IMeasurementMNode> measurementProcess,
+ MemSchemaRegionStatistics regionStatistics)
+ throws IOException {
MNodeDeserializer deserializer = new MNodeDeserializer();
Deque<IMNode> ancestors = new ArrayDeque<>();
Deque<Integer> restChildrenNum = new ArrayDeque<>();
- deserializeMNode(ancestors, restChildrenNum, deserializer, inputStream, measurementProcess);
+ deserializeMNode(
+ ancestors,
+ restChildrenNum,
+ deserializer,
+ inputStream,
+ measurementProcess,
+ regionStatistics);
int childrenNum;
IMNode root = ancestors.peek();
while (!ancestors.isEmpty()) {
@@ -172,7 +186,13 @@ public class MemMTreeSnapshotUtil {
ancestors.pop();
} else {
restChildrenNum.push(childrenNum - 1);
- deserializeMNode(ancestors, restChildrenNum, deserializer, inputStream, measurementProcess);
+ deserializeMNode(
+ ancestors,
+ restChildrenNum,
+ deserializer,
+ inputStream,
+ measurementProcess,
+ regionStatistics);
}
}
return root;
@@ -183,7 +203,8 @@ public class MemMTreeSnapshotUtil {
Deque<Integer> restChildrenNum,
MNodeDeserializer deserializer,
InputStream inputStream,
- Consumer<IMeasurementMNode> measurementProcess)
+ Consumer<IMeasurementMNode> measurementProcess,
+ MemSchemaRegionStatistics regionStatistics)
throws IOException {
byte type = ReadWriteIOUtils.readByte(inputStream);
IMNode node;
@@ -214,7 +235,7 @@ public class MemMTreeSnapshotUtil {
throw new IOException("Unrecognized MNode type " + type);
}
- MEMORY_STATISTICS.requestMemory(ESTIMATOR.estimateSize(node));
+ regionStatistics.requestMemory(ESTIMATOR.estimateSize(node));
if (!ancestors.isEmpty()) {
node.setParent(ancestors.peek());
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
index 6db80aeafa..b0ad92cb8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.metadata.mtree.store;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.metadata.cache.MNodeNotCachedException;
+import org.apache.iotdb.db.metadata.mnode.AboveDatabaseMNode;
import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
@@ -33,10 +34,10 @@ import org.apache.iotdb.db.metadata.mnode.iterator.IMNodeIterator;
import org.apache.iotdb.db.metadata.mtree.store.disk.ICachedMNodeContainer;
import org.apache.iotdb.db.metadata.mtree.store.disk.cache.CacheMemoryManager;
import org.apache.iotdb.db.metadata.mtree.store.disk.cache.ICacheManager;
-import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.IMemManager;
-import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.MemManagerHolder;
+import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.MemManager;
import org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.ISchemaFile;
import org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.SchemaFile;
+import org.apache.iotdb.db.metadata.rescon.CachedSchemaRegionStatistics;
import org.apache.iotdb.db.metadata.template.Template;
import org.slf4j.Logger;
@@ -55,10 +56,9 @@ public class CachedMTreeStore implements IMTreeStore {
private static final Logger logger = LoggerFactory.getLogger(CachedMTreeStore.class);
- private final IMemManager memManager = MemManagerHolder.getMemManagerInstance();
+ private final MemManager memManager;
- private final ICacheManager cacheManager =
- CacheMemoryManager.getInstance().createLRUCacheManager(this);
+ private final ICacheManager cacheManager;
private ISchemaFile file;
@@ -66,17 +66,43 @@ public class CachedMTreeStore implements IMTreeStore {
private final Runnable flushCallback;
+ private final CachedSchemaRegionStatistics regionStatistics;
+
private final StampedWriterPreferredLock lock = new StampedWriterPreferredLock();
- public CachedMTreeStore(PartialPath storageGroup, int schemaRegionId, Runnable flushCallback)
+ public CachedMTreeStore(
+ PartialPath storageGroup,
+ int schemaRegionId,
+ CachedSchemaRegionStatistics regionStatistics,
+ Runnable flushCallback)
throws MetadataException, IOException {
file = SchemaFile.initSchemaFile(storageGroup.getFullPath(), schemaRegionId);
root = file.init();
- cacheManager.initRootStatus(root);
+ this.regionStatistics = regionStatistics;
+ this.memManager = new MemManager(regionStatistics);
this.flushCallback = flushCallback;
+ this.cacheManager = CacheMemoryManager.getInstance().createLRUCacheManager(this, memManager);
+ cacheManager.initRootStatus(root);
ensureMemoryStatus();
}
+ @Override
+ public IMNode generatePrefix(PartialPath storageGroupPath) {
+ String[] nodes = storageGroupPath.getNodes();
+ // nodes[0] must be root
+ IMNode res = new AboveDatabaseMNode(null, nodes[0]);
+ IMNode cur = res;
+ IMNode child;
+ for (int i = 1; i < nodes.length - 1; i++) {
+ child = new AboveDatabaseMNode(cur, nodes[i]);
+ cur.addChild(nodes[i], child);
+ cur = child;
+ }
+ root.setParent(cur);
+ cur.addChild(root);
+ return res;
+ }
+
@Override
public IMNode getRoot() {
return root;
@@ -455,18 +481,30 @@ public class CachedMTreeStore implements IMTreeStore {
}
public static CachedMTreeStore loadFromSnapshot(
- File snapshotDir, String storageGroup, int schemaRegionId, Runnable flushCallback)
+ File snapshotDir,
+ String storageGroup,
+ int schemaRegionId,
+ CachedSchemaRegionStatistics regionStatistics,
+ Runnable flushCallback)
throws IOException, MetadataException {
- return new CachedMTreeStore(snapshotDir, storageGroup, schemaRegionId, flushCallback);
+ return new CachedMTreeStore(
+ snapshotDir, storageGroup, schemaRegionId, regionStatistics, flushCallback);
}
private CachedMTreeStore(
- File snapshotDir, String storageGroup, int schemaRegionId, Runnable flushCallback)
+ File snapshotDir,
+ String storageGroup,
+ int schemaRegionId,
+ CachedSchemaRegionStatistics regionStatistics,
+ Runnable flushCallback)
throws IOException, MetadataException {
file = SchemaFile.loadSnapshot(snapshotDir, storageGroup, schemaRegionId);
root = file.init();
- cacheManager.initRootStatus(root);
+ this.regionStatistics = regionStatistics;
+ this.memManager = new MemManager(regionStatistics);
this.flushCallback = flushCallback;
+ this.cacheManager = CacheMemoryManager.getInstance().createLRUCacheManager(this, memManager);
+ cacheManager.initRootStatus(root);
ensureMemoryStatus();
}
@@ -481,12 +519,14 @@ public class CachedMTreeStore implements IMTreeStore {
/**
* Keep fetching evictable nodes from cacheManager until the memory status is under safe mode or
* no node could be evicted. Update the memory status after evicting each node.
+ *
+ * @return should not continue releasing
*/
- public void executeMemoryRelease() {
- while (memManager.isExceedReleaseThreshold() && !memManager.isEmpty()) {
- if (!cacheManager.evict()) {
- break;
- }
+ public boolean executeMemoryRelease() {
+ if (regionStatistics.getCachedMemorySize() != 0) {
+ return !cacheManager.evict();
+ } else {
+ return true;
}
}
@@ -523,7 +563,9 @@ public class CachedMTreeStore implements IMTreeStore {
}
} catch (Throwable e) {
logger.error(
- "Error occurred during MTree flush, current SchemaRegion is {}", root.getFullPath(), e);
+ "Error occurred during MTree flush, current SchemaRegionId is {}",
+ regionStatistics.getSchemaRegionId(),
+ e);
e.printStackTrace();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/IMTreeStore.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/IMTreeStore.java
index 6fa23f7eaa..e15311927f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/IMTreeStore.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/IMTreeStore.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.metadata.mtree.store;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
@@ -35,6 +36,13 @@ import java.util.Map;
*/
public interface IMTreeStore {
+ /**
+ * Generate the ancestor nodes of storageGroupNode
+ *
+ * @return root node
+ */
+ IMNode generatePrefix(PartialPath storageGroupPath);
+
IMNode getRoot();
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
index 01705752c5..49b92f7861 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.AboveDatabaseMNode;
import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
@@ -35,24 +36,23 @@ import org.apache.iotdb.db.metadata.mnode.iterator.IMNodeIterator;
import org.apache.iotdb.db.metadata.mnode.iterator.MNodeIterator;
import org.apache.iotdb.db.metadata.mnode.iterator.MemoryTraverserIterator;
import org.apache.iotdb.db.metadata.mtree.snapshot.MemMTreeSnapshotUtil;
-import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
+import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
import org.apache.iotdb.db.metadata.template.Template;
import java.io.File;
import java.io.IOException;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
/** This is a memory-based implementation of IMTreeStore. All MNodes are stored in memory. */
public class MemMTreeStore implements IMTreeStore {
- private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
- private IMNodeSizeEstimator estimator = new BasicMNodSizeEstimator();
- private AtomicLong localMemoryUsage = new AtomicLong(0);
+ private final IMNodeSizeEstimator estimator = new BasicMNodSizeEstimator();
+ private MemSchemaRegionStatistics regionStatistics;
private IMNode root;
+ // Only used for ConfigMTree
public MemMTreeStore(PartialPath rootPath, boolean isStorageGroup) {
if (isStorageGroup) {
this.root =
@@ -65,8 +65,41 @@ public class MemMTreeStore implements IMTreeStore {
}
}
- private MemMTreeStore(IMNode root) {
+ public MemMTreeStore(
+ PartialPath rootPath, boolean isStorageGroup, MemSchemaRegionStatistics regionStatistics) {
+ if (isStorageGroup) {
+ this.root =
+ new StorageGroupMNode(
+ null,
+ rootPath.getTailNode(),
+ CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs());
+ } else {
+ this.root = new InternalMNode(null, IoTDBConstant.PATH_ROOT);
+ }
+ this.regionStatistics = regionStatistics;
+ }
+
+ private MemMTreeStore(IMNode root, MemSchemaRegionStatistics regionStatistics) {
this.root = root;
+ this.regionStatistics = regionStatistics;
+ }
+
+ @Override
+ public IMNode generatePrefix(PartialPath storageGroupPath) {
+ String[] nodes = storageGroupPath.getNodes();
+ // nodes[0] must be root
+ IMNode res = new AboveDatabaseMNode(null, nodes[0]);
+ IMNode cur = res;
+ IMNode child;
+ for (int i = 1; i < nodes.length - 1; i++) {
+ child = new AboveDatabaseMNode(cur, nodes[i]);
+ cur.addChild(nodes[i], child);
+ cur = child;
+ }
+ root.setParent(cur);
+ cur.addChild(root);
+ requestMemory(estimator.estimateSize(root));
+ return res;
}
@Override
@@ -186,8 +219,6 @@ public class MemMTreeStore implements IMTreeStore {
@Override
public void clear() {
root = new InternalMNode(null, IoTDBConstant.PATH_ROOT);
- memoryStatistics.releaseMemory(localMemoryUsage.get());
- localMemoryUsage.set(0);
}
@Override
@@ -196,17 +227,24 @@ public class MemMTreeStore implements IMTreeStore {
}
public static MemMTreeStore loadFromSnapshot(
- File snapshotDir, Consumer<IMeasurementMNode> measurementProcess) throws IOException {
- return new MemMTreeStore(MemMTreeSnapshotUtil.loadSnapshot(snapshotDir, measurementProcess));
+ File snapshotDir,
+ Consumer<IMeasurementMNode> measurementProcess,
+ MemSchemaRegionStatistics regionStatistics)
+ throws IOException {
+ return new MemMTreeStore(
+ MemMTreeSnapshotUtil.loadSnapshot(snapshotDir, measurementProcess, regionStatistics),
+ regionStatistics);
}
private void requestMemory(int size) {
- memoryStatistics.requestMemory(size);
- localMemoryUsage.getAndUpdate(v -> v += size);
+ if (regionStatistics != null) {
+ regionStatistics.requestMemory(size);
+ }
}
private void releaseMemory(int size) {
- localMemoryUsage.getAndUpdate(v -> v -= size);
- memoryStatistics.releaseMemory(size);
+ if (regionStatistics != null) {
+ regionStatistics.releaseMemory(size);
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/ReentrantReadOnlyCachedMTreeStore.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/ReentrantReadOnlyCachedMTreeStore.java
index dbea2e663a..0c7df571a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/ReentrantReadOnlyCachedMTreeStore.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/ReentrantReadOnlyCachedMTreeStore.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.metadata.mtree.store;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
@@ -37,6 +38,11 @@ public class ReentrantReadOnlyCachedMTreeStore implements IMTreeStore {
this.readLockStamp = store.stampedReadLock();
}
+ @Override
+ public IMNode generatePrefix(PartialPath storageGroupPath) {
+ throw new UnsupportedOperationException("ReadOnlyReentrantMTreeStore");
+ }
+
@Override
public IMNode getRoot() {
return store.getRoot();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheManager.java
index 4e9fb1819f..3d59cb1263 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheManager.java
@@ -23,8 +23,7 @@ import org.apache.iotdb.db.exception.metadata.cache.MNodeNotPinnedException;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.mtree.store.disk.ICachedMNodeContainer;
-import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.IMemManager;
-import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.MemManagerHolder;
+import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.MemManager;
import java.util.ArrayList;
import java.util.Iterator;
@@ -65,10 +64,14 @@ import static org.apache.iotdb.db.metadata.mtree.store.disk.ICachedMNodeContaine
*/
public abstract class CacheManager implements ICacheManager {
- private IMemManager memManager = MemManagerHolder.getMemManagerInstance();
+ private final MemManager memManager;
// The nodeBuffer helps to quickly locate the volatile subtree
- private NodeBuffer nodeBuffer = new NodeBuffer();
+ private final NodeBuffer nodeBuffer = new NodeBuffer();
+
+ public CacheManager(MemManager memManager) {
+ this.memManager = memManager;
+ }
public void initRootStatus(IMNode root) {
pinMNodeWithMemStatusUpdate(root);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
index 6ba8d98436..8ac11408b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
@@ -20,9 +20,14 @@ package org.apache.iotdb.db.metadata.mtree.store.disk.cache;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.metadata.mtree.store.CachedMTreeStore;
-import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.IMemManager;
-import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.MemManagerHolder;
+import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.IReleaseFlushStrategy;
+import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.MemManager;
+import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.ReleaseFlushStrategyNumBasedImpl;
+import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.ReleaseFlushStrategySizeBasedImpl;
+import org.apache.iotdb.db.metadata.rescon.CachedSchemaEngineStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaEngineStatisticsHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +48,7 @@ public class CacheMemoryManager {
private final List<CachedMTreeStore> storeList = new ArrayList<>();
- private final IMemManager memManager = MemManagerHolder.getMemManagerInstance();
+ private CachedSchemaEngineStatistics engineStatistics;
private static final int CONCURRENT_NUM = 10;
@@ -56,6 +61,8 @@ public class CacheMemoryManager {
private volatile boolean hasReleaseTask;
private int releaseCount = 0;
+ private IReleaseFlushStrategy releaseFlushStrategy;
+
private static final int MAX_WAITING_TIME_WHEN_RELEASING = 10_000;
private final Object blockObject = new Object();
@@ -65,15 +72,23 @@ public class CacheMemoryManager {
* @param store CachedMTreeStore
* @return LRUCacheManager
*/
- public ICacheManager createLRUCacheManager(CachedMTreeStore store) {
+ public ICacheManager createLRUCacheManager(CachedMTreeStore store, MemManager memManager) {
synchronized (storeList) {
- ICacheManager cacheManager = new LRUCacheManager();
+ ICacheManager cacheManager = new LRUCacheManager(memManager);
storeList.add(store);
return cacheManager;
}
}
public void init() {
+ engineStatistics =
+ SchemaEngineStatisticsHolder.getSchemaEngineStatistics()
+ .getAsCachedSchemaEngineStatistics();
+ if (IoTDBDescriptor.getInstance().getConfig().getCachedMNodeSizeInSchemaFileMode() >= 0) {
+ releaseFlushStrategy = new ReleaseFlushStrategyNumBasedImpl(engineStatistics);
+ } else {
+ releaseFlushStrategy = new ReleaseFlushStrategySizeBasedImpl(engineStatistics);
+ }
flushTaskExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(
CONCURRENT_NUM, ThreadName.SCHEMA_REGION_FLUSH_POOL.getName());
@@ -82,12 +97,20 @@ public class CacheMemoryManager {
CONCURRENT_NUM, ThreadName.SCHEMA_REGION_RELEASE_POOL.getName());
}
+ public boolean isExceedReleaseThreshold() {
+ return releaseFlushStrategy.isExceedReleaseThreshold();
+ }
+
+ public boolean isExceedFlushThreshold() {
+ return releaseFlushStrategy.isExceedFlushThreshold();
+ }
+
/**
* Check the current memory usage. If the release threshold is exceeded, trigger the task to
* perform an internal and external memory swap to release the memory.
*/
public void ensureMemoryStatus() {
- if (memManager.isExceedReleaseThreshold() && !hasReleaseTask) {
+ if (isExceedReleaseThreshold() && !hasReleaseTask) {
registerReleaseTask();
}
}
@@ -143,7 +166,7 @@ public class CacheMemoryManager {
() -> {
store.getLock().threadReadLock();
try {
- store.executeMemoryRelease();
+ executeMemoryRelease(store);
} finally {
store.getLock().threadReadUnlock();
}
@@ -154,7 +177,7 @@ public class CacheMemoryManager {
releaseCount++;
synchronized (blockObject) {
hasReleaseTask = false;
- if (memManager.isExceedFlushThreshold() && !hasFlushTask) {
+ if (isExceedFlushThreshold() && !hasFlushTask) {
registerFlushTask();
} else {
blockObject.notifyAll();
@@ -163,6 +186,16 @@ public class CacheMemoryManager {
}
}
+ private void executeMemoryRelease(CachedMTreeStore store) {
+ while (isExceedReleaseThreshold()) {
+ // store try to release memory if not exceed release threshold
+ if (store.executeMemoryRelease()) {
+ // if store can not release memory, break
+ break;
+ }
+ }
+ }
+
private synchronized void registerFlushTask() {
if (hasFlushTask) {
return;
@@ -192,7 +225,7 @@ public class CacheMemoryManager {
store.getLock().writeLock();
try {
store.flushVolatileNodes();
- store.executeMemoryRelease();
+ executeMemoryRelease(store);
} finally {
store.getLock().unlockWrite();
}
@@ -230,6 +263,9 @@ public class CacheMemoryManager {
}
flushTaskExecutor = null;
}
+ storeList.clear();
+ releaseFlushStrategy = null;
+ engineStatistics = null;
}
private CacheMemoryManager() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/LRUCacheManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/LRUCacheManager.java
index 8a4be4d5cb..67da4bd27c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/LRUCacheManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/LRUCacheManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.metadata.mtree.store.disk.cache;
import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.MemManager;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -27,9 +28,10 @@ public class LRUCacheManager extends CacheManager {
private static final int NUM_OF_LIST = 17;
- private LRUCacheList[] lruCacheLists = new LRUCacheList[NUM_OF_LIST];
+ private final LRUCacheList[] lruCacheLists = new LRUCacheList[NUM_OF_LIST];
- public LRUCacheManager() {
+ public LRUCacheManager(MemManager memManager) {
+ super(memManager);
for (int i = 0; i < NUM_OF_LIST; i++) {
lruCacheLists[i] = new LRUCacheList();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/PlainCacheManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/PlainCacheManager.java
index 0ada5dc0d3..0c793c3f30 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/PlainCacheManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/PlainCacheManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.metadata.mtree.store.disk.cache;
import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.MemManager;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -30,6 +31,10 @@ public class PlainCacheManager extends CacheManager {
// eviction.
private volatile Map<CacheEntry, IMNode> nodeCache = new ConcurrentHashMap<>();
+ public PlainCacheManager(MemManager memManager) {
+ super(memManager);
+ }
+
@Override
protected void updateCacheStatusAfterAccess(CacheEntry cacheEntry) {}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/IMemManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/IReleaseFlushStrategy.java
similarity index 66%
copy from server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/IMemManager.java
copy to server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/IReleaseFlushStrategy.java
index 5837852a8f..2b975b778d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/IMemManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/IReleaseFlushStrategy.java
@@ -18,35 +18,11 @@
*/
package org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol;
-import org.apache.iotdb.db.metadata.mnode.IMNode;
-
-import java.util.List;
-
-public interface IMemManager {
-
- void init();
-
- boolean isEmpty();
-
+/** This interface defines the threshold strategy for release and flush task */
+public interface IReleaseFlushStrategy {
+ /** Check if exceed release threshold */
boolean isExceedReleaseThreshold();
+ /** Check if exceed flush threshold */
boolean isExceedFlushThreshold();
-
- void requestPinnedMemResource(IMNode node);
-
- void upgradeMemResource(IMNode node);
-
- void releasePinnedMemResource(IMNode node);
-
- void releaseMemResource(IMNode node);
-
- void releaseMemResource(List<IMNode> evictedNodes);
-
- void updatePinnedSize(int deltaSize);
-
- void clear();
-
- long getPinnedSize();
-
- long getCachedSize();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManager.java
new file mode 100644
index 0000000000..a8216725d4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManager.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol;
+
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.rescon.CachedSchemaRegionStatistics;
+
+import java.util.List;
+
+// This class is used for memory control in industry environment.
+public class MemManager {
+
+ private final CachedSchemaRegionStatistics regionStatistics;
+
+ private final CachedMNodeSizeEstimator estimator = new CachedMNodeSizeEstimator();
+
+ public MemManager(CachedSchemaRegionStatistics regionStatistics) {
+ this.regionStatistics = regionStatistics;
+ }
+
+ public void requestPinnedMemResource(IMNode node) {
+ int size = estimator.estimateSize(node);
+ regionStatistics.requestMemory(size);
+ regionStatistics.updatePinnedMemorySize(size);
+ regionStatistics.updatePinnedMNodeNum(1);
+ }
+
+ public void upgradeMemResource(IMNode node) {
+ int size = estimator.estimateSize(node);
+ regionStatistics.updatePinnedMemorySize(size);
+ regionStatistics.updatePinnedMNodeNum(1);
+ regionStatistics.updateUnpinnedMemorySize(-size);
+ regionStatistics.updateUnpinnedMNodeNum(-1);
+ }
+
+ public void releasePinnedMemResource(IMNode node) {
+ int size = estimator.estimateSize(node);
+ regionStatistics.updateUnpinnedMemorySize(size);
+ regionStatistics.updateUnpinnedMNodeNum(1);
+ regionStatistics.updatePinnedMemorySize(-size);
+ regionStatistics.updatePinnedMNodeNum(-1);
+ }
+
+ public void releaseMemResource(IMNode node) {
+ int size = estimator.estimateSize(node);
+ regionStatistics.updateUnpinnedMemorySize(-size);
+ regionStatistics.updateUnpinnedMNodeNum(-1);
+ regionStatistics.releaseMemory(size);
+ }
+
+ public void releaseMemResource(List<IMNode> evictedNodes) {
+ int size = 0;
+ for (IMNode node : evictedNodes) {
+ size += estimator.estimateSize(node);
+ }
+ regionStatistics.updateUnpinnedMNodeNum(-evictedNodes.size());
+ regionStatistics.updateUnpinnedMemorySize(-size);
+ regionStatistics.releaseMemory(size);
+ }
+
+ public void updatePinnedSize(int deltaSize) {
+ if (deltaSize > 0) {
+ regionStatistics.requestMemory(deltaSize);
+ } else {
+ regionStatistics.releaseMemory(-deltaSize);
+ }
+ regionStatistics.updatePinnedMemorySize(deltaSize);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManagerNodeEstimatedSizeBasedImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManagerNodeEstimatedSizeBasedImpl.java
deleted file mode 100644
index 0f87936e97..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManagerNodeEstimatedSizeBasedImpl.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol;
-
-import org.apache.iotdb.db.metadata.mnode.IMNode;
-import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-// This class is used for memory control in industry environment.
-public class MemManagerNodeEstimatedSizeBasedImpl implements IMemManager {
-
- private static final double RELEASE_THRESHOLD_RATIO = 0.6;
- private static final double FLUSH_THRESHOLD_RATION = 0.75;
-
- private final MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
-
- private long releaseThreshold;
- private long flushThreshold;
-
- // memory size
- private final AtomicLong size = new AtomicLong(0);
-
- private final AtomicLong pinnedSize = new AtomicLong(0);
-
- private final CachedMNodeSizeEstimator estimator = new CachedMNodeSizeEstimator();
-
- @Override
- public void init() {
- size.getAndSet(0);
- pinnedSize.getAndSet(0);
- releaseThreshold = (long) (memoryStatistics.getMemoryCapacity() * RELEASE_THRESHOLD_RATIO);
- flushThreshold = (long) (memoryStatistics.getMemoryCapacity() * FLUSH_THRESHOLD_RATION);
- }
-
- @Override
- public boolean isEmpty() {
- return size.get() == 0;
- }
-
- @Override
- public boolean isExceedReleaseThreshold() {
- return memoryStatistics.getMemoryUsage() > releaseThreshold;
- }
-
- @Override
- public boolean isExceedFlushThreshold() {
- return memoryStatistics.getMemoryUsage() > flushThreshold;
- }
-
- @Override
- public void requestPinnedMemResource(IMNode node) {
- int size = estimator.estimateSize(node);
- memoryStatistics.requestMemory(size);
- pinnedSize.getAndUpdate(v -> v += size);
- }
-
- @Override
- public void upgradeMemResource(IMNode node) {
- int size = estimator.estimateSize(node);
- pinnedSize.getAndUpdate(v -> v += size);
- this.size.getAndUpdate(v -> v -= size);
- }
-
- @Override
- public void releasePinnedMemResource(IMNode node) {
- int size = estimator.estimateSize(node);
- this.size.getAndUpdate(v -> v += size);
- pinnedSize.getAndUpdate(v -> v -= size);
- }
-
- @Override
- public void releaseMemResource(IMNode node) {
- int size = estimator.estimateSize(node);
- this.size.getAndUpdate(v -> v -= size);
- memoryStatistics.releaseMemory(size);
- }
-
- @Override
- public void releaseMemResource(List<IMNode> evictedNodes) {
- int size = 0;
- for (IMNode node : evictedNodes) {
- size += estimator.estimateSize(node);
- }
- int finalSize = size;
- this.size.getAndUpdate(v -> v -= finalSize);
- memoryStatistics.releaseMemory(size);
- }
-
- @Override
- public void updatePinnedSize(int deltaSize) {
- if (deltaSize > 0) {
- memoryStatistics.requestMemory(deltaSize);
- } else {
- memoryStatistics.releaseMemory(deltaSize);
- }
- pinnedSize.getAndUpdate(v -> v += deltaSize);
- }
-
- @Override
- public void clear() {
- size.getAndSet(0);
- pinnedSize.getAndSet(0);
- }
-
- @Override
- public long getPinnedSize() {
- return pinnedSize.get();
- }
-
- @Override
- public long getCachedSize() {
- return size.get();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManagerNodeNumBasedImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManagerNodeNumBasedImpl.java
deleted file mode 100644
index 993fcfee4f..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManagerNodeNumBasedImpl.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol;
-
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.metadata.mnode.IMNode;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-// This class is used for memory control in dev and debug environment.
-public class MemManagerNodeNumBasedImpl implements IMemManager {
-
- private int capacity;
-
- private final AtomicInteger size = new AtomicInteger(0);
-
- private final AtomicInteger pinnedSize = new AtomicInteger(0);
-
- MemManagerNodeNumBasedImpl() {}
-
- @Override
- public void init() {
- capacity = IoTDBDescriptor.getInstance().getConfig().getCachedMNodeSizeInSchemaFileMode();
- }
-
- @Override
- public boolean isEmpty() {
- return size.get() == 0;
- }
-
- @Override
- public boolean isExceedReleaseThreshold() {
- return size.get() + pinnedSize.get() > capacity * 0.6;
- }
-
- @Override
- public boolean isExceedFlushThreshold() {
- return size.get() + pinnedSize.get() > capacity;
- }
-
- @Override
- public void requestPinnedMemResource(IMNode node) {
- pinnedSize.getAndIncrement();
- }
-
- @Override
- public void upgradeMemResource(IMNode node) {
- pinnedSize.getAndIncrement();
- size.getAndDecrement();
- }
-
- @Override
- public void releasePinnedMemResource(IMNode node) {
- size.getAndIncrement();
- pinnedSize.getAndDecrement();
- }
-
- @Override
- public void releaseMemResource(IMNode node) {
- size.getAndDecrement();
- }
-
- @Override
- public void releaseMemResource(List<IMNode> evictedNodes) {
- size.getAndUpdate(value -> value -= evictedNodes.size());
- }
-
- @Override
- public void updatePinnedSize(int deltaSize) {
- // do nothing
- }
-
- @Override
- public void clear() {
- size.getAndSet(0);
- pinnedSize.getAndSet(0);
- }
-
- @Override
- public long getPinnedSize() {
- return pinnedSize.get();
- }
-
- @Override
- public long getCachedSize() {
- return size.get();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManagerHolder.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/ReleaseFlushStrategyNumBasedImpl.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManagerHolder.java
copy to server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/ReleaseFlushStrategyNumBasedImpl.java
index f462009588..c6c469d8d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManagerHolder.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/ReleaseFlushStrategyNumBasedImpl.java
@@ -16,26 +16,31 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.rescon.CachedSchemaEngineStatistics;
+
+/** Number-based threshold strategy */
+public class ReleaseFlushStrategyNumBasedImpl implements IReleaseFlushStrategy {
-public class MemManagerHolder {
+ private final int capacity;
- private static IMemManager memManagerInstance;
+ private final CachedSchemaEngineStatistics engineStatistics;
- public static void initMemManagerInstance() {
- if (IoTDBDescriptor.getInstance().getConfig().getCachedMNodeSizeInSchemaFileMode() >= 0) {
- memManagerInstance = new MemManagerNodeNumBasedImpl();
- } else {
- memManagerInstance = new MemManagerNodeEstimatedSizeBasedImpl();
- }
+ public ReleaseFlushStrategyNumBasedImpl(CachedSchemaEngineStatistics engineStatistics) {
+ this.engineStatistics = engineStatistics;
+ this.capacity = IoTDBDescriptor.getInstance().getConfig().getCachedMNodeSizeInSchemaFileMode();
}
- public static IMemManager getMemManagerInstance() {
- return memManagerInstance;
+ @Override
+ public boolean isExceedReleaseThreshold() {
+ return engineStatistics.getPinnedMNodeNum() + engineStatistics.getCachedMNodeNum()
+ > capacity * 0.6;
}
- private MemManagerHolder() {}
+ @Override
+ public boolean isExceedFlushThreshold() {
+ return engineStatistics.getPinnedMNodeNum() + engineStatistics.getCachedMNodeNum() > capacity;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/ReleaseFlushStrategySizeBasedImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/ReleaseFlushStrategySizeBasedImpl.java
new file mode 100644
index 0000000000..961ce49fc4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/ReleaseFlushStrategySizeBasedImpl.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.rescon.CachedSchemaEngineStatistics;
+
+/** Threshold strategy based on the number of nodes. */
+public class ReleaseFlushStrategySizeBasedImpl implements IReleaseFlushStrategy {
+
+ private final CachedSchemaEngineStatistics engineStatistics;
+
+ private final long releaseThreshold;
+ private final long flushThreshold;
+
+ private static final double RELEASE_THRESHOLD_RATIO = 0.6;
+ private static final double FLUSH_THRESHOLD_RATION = 0.75;
+
+ public ReleaseFlushStrategySizeBasedImpl(CachedSchemaEngineStatistics engineStatistics) {
+ this.engineStatistics = engineStatistics;
+ long capacity = IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForSchemaRegion();
+ this.releaseThreshold = (long) (capacity * RELEASE_THRESHOLD_RATIO);
+ this.flushThreshold = (long) (capacity * FLUSH_THRESHOLD_RATION);
+ }
+
+ @Override
+ public boolean isExceedReleaseThreshold() {
+ return engineStatistics.getMemoryUsage() > releaseThreshold;
+ }
+
+ @Override
+ public boolean isExceedFlushThreshold() {
+ return engineStatistics.getMemoryUsage() > flushThreshold;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/CachedSchemaEngineStatistics.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/CachedSchemaEngineStatistics.java
new file mode 100644
index 0000000000..727d4a0785
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/CachedSchemaEngineStatistics.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.rescon;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class is used to record global statistics for SchemaEngine in Schema_File mode, which is a
+ * superset of the statistics in Memory mode
+ */
+public class CachedSchemaEngineStatistics extends MemSchemaEngineStatistics {
+
+ private final AtomicLong unpinnedMemorySize = new AtomicLong(0);
+ private final AtomicLong pinnedMemorySize = new AtomicLong(0);
+ private final AtomicLong unpinnedMNodeNum = new AtomicLong(0);
+ private final AtomicLong pinnedMNodeNum = new AtomicLong(0);
+
+ @Override
+ public void init() {
+ super.init();
+ unpinnedMemorySize.getAndSet(0);
+ pinnedMemorySize.getAndSet(0);
+ unpinnedMNodeNum.getAndSet(0);
+ pinnedMNodeNum.getAndSet(0);
+ }
+
+ public void updatePinnedMNodeNum(long delta) {
+ this.pinnedMNodeNum.addAndGet(delta);
+ }
+
+ public void updateUnpinnedMNodeNum(long delta) {
+ this.unpinnedMNodeNum.addAndGet(delta);
+ }
+
+ public void updatePinnedMemorySize(long delta) {
+ this.pinnedMemorySize.addAndGet(delta);
+ }
+
+ public void updateUnpinnedMemorySize(long delta) {
+ this.unpinnedMemorySize.addAndGet(delta);
+ }
+
+ public long getCachedMemorySize() {
+ return unpinnedMemorySize.get();
+ }
+
+ public long getPinnedMemorySize() {
+ return pinnedMemorySize.get();
+ }
+
+ public long getCachedMNodeNum() {
+ return unpinnedMNodeNum.get();
+ }
+
+ public long getPinnedMNodeNum() {
+ return pinnedMNodeNum.get();
+ }
+
+ @Override
+ public MemSchemaEngineStatistics getAsMemSchemaEngineStatistics() {
+ return this;
+ }
+
+ @Override
+ public CachedSchemaEngineStatistics getAsCachedSchemaEngineStatistics() {
+ return this;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/CachedSchemaRegionStatistics.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/CachedSchemaRegionStatistics.java
new file mode 100644
index 0000000000..b39a21e8ea
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/CachedSchemaRegionStatistics.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.rescon;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class is used to record statistics within a SchemaRegion in Schema_File mode, which is a
+ * superset of the statistics in Memory mode
+ */
+public class CachedSchemaRegionStatistics extends MemSchemaRegionStatistics {
+
+ private final AtomicLong unpinnedMemorySize = new AtomicLong(0);
+ private final AtomicLong pinnedMemorySize = new AtomicLong(0);
+ private final AtomicLong unpinnedMNodeNum = new AtomicLong(0);
+ private final AtomicLong pinnedMNodeNum = new AtomicLong(0);
+ private final CachedSchemaEngineStatistics cachedEngineStatistics;
+
+ public CachedSchemaRegionStatistics(int schemaRegionId) {
+ super(schemaRegionId);
+ cachedEngineStatistics = schemaEngineStatistics.getAsCachedSchemaEngineStatistics();
+ }
+
+ public void updatePinnedMNodeNum(int delta) {
+ this.pinnedMNodeNum.addAndGet(delta);
+ cachedEngineStatistics.updatePinnedMNodeNum(delta);
+ }
+
+ public void updateUnpinnedMNodeNum(int delta) {
+ this.unpinnedMNodeNum.addAndGet(delta);
+ cachedEngineStatistics.updateUnpinnedMNodeNum(delta);
+ }
+
+ public void updatePinnedMemorySize(int delta) {
+ this.pinnedMemorySize.addAndGet(delta);
+ cachedEngineStatistics.updatePinnedMemorySize(delta);
+ }
+
+ public void updateUnpinnedMemorySize(int delta) {
+ this.unpinnedMemorySize.addAndGet(delta);
+ cachedEngineStatistics.updateUnpinnedMemorySize(delta);
+ }
+
+ public long getCachedMemorySize() {
+ return unpinnedMemorySize.get();
+ }
+
+ public long getPinnedMemorySize() {
+ return pinnedMemorySize.get();
+ }
+
+ public long getCachedMNodeNum() {
+ return unpinnedMNodeNum.get();
+ }
+
+ public long getPinnedMNodeNum() {
+ return pinnedMNodeNum.get();
+ }
+
+ @Override
+ public CachedSchemaRegionStatistics getAsCachedSchemaRegionStatistics() {
+ return this;
+ }
+
+ @Override
+ public MemSchemaRegionStatistics getAsMemSchemaRegionStatistics() {
+ return this;
+ }
+
+ @Override
+ public void clear() {
+ super.clear();
+ cachedEngineStatistics.updatePinnedMNodeNum(-pinnedMNodeNum.get());
+ cachedEngineStatistics.updateUnpinnedMNodeNum(-unpinnedMNodeNum.get());
+ cachedEngineStatistics.updatePinnedMemorySize(-pinnedMemorySize.get());
+ cachedEngineStatistics.updateUnpinnedMemorySize(-unpinnedMemorySize.get());
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManagerHolder.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/ISchemaEngineStatistics.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManagerHolder.java
copy to server/src/main/java/org/apache/iotdb/db/metadata/rescon/ISchemaEngineStatistics.java
index f462009588..aed059bba3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManagerHolder.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/ISchemaEngineStatistics.java
@@ -16,26 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.iotdb.db.metadata.rescon;
-package org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol;
+public interface ISchemaEngineStatistics {
+ void init();
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+ boolean isAllowToCreateNewSeries();
-public class MemManagerHolder {
+ boolean isExceedCapacity();
- private static IMemManager memManagerInstance;
+ long getMemoryCapacity();
- public static void initMemManagerInstance() {
- if (IoTDBDescriptor.getInstance().getConfig().getCachedMNodeSizeInSchemaFileMode() >= 0) {
- memManagerInstance = new MemManagerNodeNumBasedImpl();
- } else {
- memManagerInstance = new MemManagerNodeEstimatedSizeBasedImpl();
- }
- }
+ long getMemoryUsage();
- public static IMemManager getMemManagerInstance() {
- return memManagerInstance;
- }
+ long getTotalSeriesNumber();
- private MemManagerHolder() {}
+ MemSchemaEngineStatistics getAsMemSchemaEngineStatistics();
+
+ CachedSchemaEngineStatistics getAsCachedSchemaEngineStatistics();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/IMemManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/ISchemaRegionStatistics.java
similarity index 58%
rename from server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/IMemManager.java
rename to server/src/main/java/org/apache/iotdb/db/metadata/rescon/ISchemaRegionStatistics.java
index 5837852a8f..a03d229b70 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/IMemManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/ISchemaRegionStatistics.java
@@ -16,37 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol;
+package org.apache.iotdb.db.metadata.rescon;
-import org.apache.iotdb.db.metadata.mnode.IMNode;
+public interface ISchemaRegionStatistics {
-import java.util.List;
+ boolean isAllowToCreateNewSeries();
-public interface IMemManager {
+ long getRegionMemoryUsage();
- void init();
+ int getSchemaRegionId();
- boolean isEmpty();
+ long getSeriesNumber();
- boolean isExceedReleaseThreshold();
+ MemSchemaRegionStatistics getAsMemSchemaRegionStatistics();
- boolean isExceedFlushThreshold();
-
- void requestPinnedMemResource(IMNode node);
-
- void upgradeMemResource(IMNode node);
-
- void releasePinnedMemResource(IMNode node);
-
- void releaseMemResource(IMNode node);
-
- void releaseMemResource(List<IMNode> evictedNodes);
-
- void updatePinnedSize(int deltaSize);
+ CachedSchemaRegionStatistics getAsCachedSchemaRegionStatistics();
void clear();
-
- long getPinnedSize();
-
- long getCachedSize();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemoryStatistics.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemSchemaEngineStatistics.java
similarity index 60%
rename from server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemoryStatistics.java
rename to server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemSchemaEngineStatistics.java
index 78d2e76db2..2c74292c7d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemoryStatistics.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemSchemaEngineStatistics.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.iotdb.db.metadata.rescon;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -26,72 +25,87 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicLong;
-public class MemoryStatistics {
+/** This class is used to record the global statistics of SchemaEngine in Memory mode */
+public class MemSchemaEngineStatistics implements ISchemaEngineStatistics {
- private static final Logger logger = LoggerFactory.getLogger(MemoryStatistics.class);
+ private static final Logger logger = LoggerFactory.getLogger(MemSchemaEngineStatistics.class);
- /** threshold total size of MTree */
+ /** Total size of schema region */
private long memoryCapacity;
- private final AtomicLong memoryUsage = new AtomicLong(0);
-
- private volatile boolean allowToCreateNewSeries;
-
- private static class MemoryStatisticsHolder {
-
- private MemoryStatisticsHolder() {
- // allowed to do nothing
- }
+ protected final AtomicLong memoryUsage = new AtomicLong(0);
- private static final MemoryStatistics INSTANCE = new MemoryStatistics();
- }
+ private final AtomicLong totalSeriesNumber = new AtomicLong(0);
- public static MemoryStatistics getInstance() {
- return MemoryStatisticsHolder.INSTANCE;
- }
-
- private MemoryStatistics() {}
+ private volatile boolean allowToCreateNewSeries;
+ @Override
public void init() {
memoryCapacity = IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForSchemaRegion();
memoryUsage.getAndSet(0);
+ totalSeriesNumber.getAndSet(0);
allowToCreateNewSeries = true;
}
+ @Override
public boolean isAllowToCreateNewSeries() {
return allowToCreateNewSeries;
}
+ @Override
public boolean isExceedCapacity() {
return memoryUsage.get() > memoryCapacity;
}
+ @Override
public long getMemoryCapacity() {
return memoryCapacity;
}
+ @Override
public long getMemoryUsage() {
return memoryUsage.get();
}
public void requestMemory(long size) {
- memoryUsage.getAndUpdate(v -> v += size);
+ memoryUsage.addAndGet(size);
if (memoryUsage.get() >= memoryCapacity) {
- logger.warn("Current series number {} is too large...", memoryUsage);
+ logger.warn("Current series memory {} is too large...", memoryUsage);
allowToCreateNewSeries = false;
}
}
public void releaseMemory(long size) {
- memoryUsage.getAndUpdate(v -> v -= size);
+ memoryUsage.addAndGet(-size);
if (!allowToCreateNewSeries && memoryUsage.get() < memoryCapacity) {
- logger.info("Current series number {} come back to normal level", memoryUsage);
+ logger.info(
+ "Current series memory {} come back to normal level, total series number is {}.",
+ memoryUsage,
+ totalSeriesNumber);
allowToCreateNewSeries = true;
}
}
- public void clear() {
- memoryUsage.getAndSet(0);
- allowToCreateNewSeries = true;
+ @Override
+ public long getTotalSeriesNumber() {
+ return totalSeriesNumber.get();
+ }
+
+ public void addTimeseries(long addedNum) {
+ totalSeriesNumber.addAndGet(addedNum);
+ }
+
+ public void deleteTimeseries(long deletedNum) {
+ totalSeriesNumber.addAndGet(-deletedNum);
+ }
+
+ @Override
+ public MemSchemaEngineStatistics getAsMemSchemaEngineStatistics() {
+ return this;
+ }
+
+ @Override
+ public CachedSchemaEngineStatistics getAsCachedSchemaEngineStatistics() {
+ throw new UnsupportedOperationException("Wrong SchemaEngineStatistics Type");
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemSchemaRegionStatistics.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemSchemaRegionStatistics.java
new file mode 100644
index 0000000000..ef70db15b3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemSchemaRegionStatistics.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.rescon;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/** This class is used to record statistics within the SchemaRegion in Memory mode */
+public class MemSchemaRegionStatistics implements ISchemaRegionStatistics {
+
+ protected MemSchemaEngineStatistics schemaEngineStatistics =
+ SchemaEngineStatisticsHolder.getSchemaEngineStatistics().getAsMemSchemaEngineStatistics();
+ private final int schemaRegionId;
+ private final AtomicLong memoryUsage = new AtomicLong(0);
+ private final AtomicLong seriesNumber = new AtomicLong(0);
+
+ public MemSchemaRegionStatistics(int schemaRegionId) {
+ this.schemaRegionId = schemaRegionId;
+ }
+
+ @Override
+ public boolean isAllowToCreateNewSeries() {
+ return schemaEngineStatistics.isAllowToCreateNewSeries();
+ }
+
+ public void requestMemory(long size) {
+ memoryUsage.addAndGet(size);
+ schemaEngineStatistics.requestMemory(size);
+ }
+
+ public void releaseMemory(long size) {
+ memoryUsage.addAndGet(-size);
+ schemaEngineStatistics.releaseMemory(size);
+ }
+
+ @Override
+ public long getSeriesNumber() {
+ return seriesNumber.get();
+ }
+
+ public void addTimeseries(long addedNum) {
+ seriesNumber.addAndGet(addedNum);
+ schemaEngineStatistics.addTimeseries(addedNum);
+ }
+
+ public void deleteTimeseries(long deletedNum) {
+ seriesNumber.addAndGet(-deletedNum);
+ schemaEngineStatistics.deleteTimeseries(deletedNum);
+ }
+
+ @Override
+ public long getRegionMemoryUsage() {
+ return memoryUsage.get();
+ }
+
+ @Override
+ public int getSchemaRegionId() {
+ return schemaRegionId;
+ }
+
+ @Override
+ public MemSchemaRegionStatistics getAsMemSchemaRegionStatistics() {
+ return this;
+ }
+
+ @Override
+ public CachedSchemaRegionStatistics getAsCachedSchemaRegionStatistics() {
+ throw new UnsupportedOperationException("Wrong SchemaRegionStatistics Type");
+ }
+
+ @Override
+ public void clear() {
+ schemaEngineStatistics.releaseMemory(memoryUsage.get());
+ schemaEngineStatistics.deleteTimeseries(seriesNumber.get());
+ memoryUsage.getAndSet(0);
+ seriesNumber.getAndSet(0);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManagerHolder.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaEngineStatisticsHolder.java
similarity index 56%
rename from server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManagerHolder.java
rename to server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaEngineStatisticsHolder.java
index f462009588..9226c2ccfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/memcontrol/MemManagerHolder.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaEngineStatisticsHolder.java
@@ -16,26 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-
-package org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol;
+package org.apache.iotdb.db.metadata.rescon;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-public class MemManagerHolder {
-
- private static IMemManager memManagerInstance;
+// TODO: SchemaEngineStatistics should not be singleton later
+public class SchemaEngineStatisticsHolder {
+ private static MemSchemaEngineStatistics schemaEngineStatistics;
- public static void initMemManagerInstance() {
- if (IoTDBDescriptor.getInstance().getConfig().getCachedMNodeSizeInSchemaFileMode() >= 0) {
- memManagerInstance = new MemManagerNodeNumBasedImpl();
+ public static void initSchemaEngineStatisticsInstance() {
+ if (IoTDBDescriptor.getInstance().getConfig().getSchemaEngineMode().equals("Memory")) {
+ schemaEngineStatistics = new MemSchemaEngineStatistics();
} else {
- memManagerInstance = new MemManagerNodeEstimatedSizeBasedImpl();
+ schemaEngineStatistics = new CachedSchemaEngineStatistics();
}
+ schemaEngineStatistics.init();
}
- public static IMemManager getMemManagerInstance() {
- return memManagerInstance;
+ public static ISchemaEngineStatistics getSchemaEngineStatistics() {
+ return schemaEngineStatistics;
}
- private MemManagerHolder() {}
+ private SchemaEngineStatisticsHolder() {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
index 771a010767..0673b196d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.metadata.rescon;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.metadata.mtree.store.disk.cache.CacheMemoryManager;
-import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.MemManagerHolder;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngineMode;
public class SchemaResourceManager {
@@ -30,11 +29,11 @@ public class SchemaResourceManager {
private SchemaResourceManager() {}
public static void initSchemaResource() {
+ SchemaEngineStatisticsHolder.initSchemaEngineStatisticsInstance();
MetricService.getInstance()
.addMetricSet(
new SchemaResourceManagerMetrics(
- SchemaStatisticsManager.getInstance(), MemoryStatistics.getInstance()));
- MemoryStatistics.getInstance().init();
+ SchemaEngineStatisticsHolder.getSchemaEngineStatistics()));
if (IoTDBDescriptor.getInstance()
.getConfig()
.getSchemaEngineMode()
@@ -44,8 +43,6 @@ public class SchemaResourceManager {
}
public static void clearSchemaResource() {
- SchemaStatisticsManager.getInstance().clear();
- MemoryStatistics.getInstance().clear();
if (IoTDBDescriptor.getInstance()
.getConfig()
.getSchemaEngineMode()
@@ -55,13 +52,10 @@ public class SchemaResourceManager {
}
private static void initSchemaFileModeResource() {
- MemManagerHolder.initMemManagerInstance();
- MemManagerHolder.getMemManagerInstance().init();
CacheMemoryManager.getInstance().init();
}
private static void clearSchemaFileModeResource() {
- MemManagerHolder.getMemManagerInstance().clear();
CacheMemoryManager.getInstance().clear();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManagerMetrics.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManagerMetrics.java
index 60ad16b6c4..d70e620b5b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManagerMetrics.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManagerMetrics.java
@@ -29,14 +29,10 @@ import org.apache.iotdb.metrics.utils.MetricType;
import java.util.Objects;
public class SchemaResourceManagerMetrics implements IMetricSet {
- private final SchemaStatisticsManager schemaStatisticsManager;
+ private final ISchemaEngineStatistics schemaEngineStatistics;
- private final MemoryStatistics memoryStatistics;
-
- public SchemaResourceManagerMetrics(
- SchemaStatisticsManager schemaStatisticsManager, MemoryStatistics memoryStatistics) {
- this.schemaStatisticsManager = schemaStatisticsManager;
- this.memoryStatistics = memoryStatistics;
+ public SchemaResourceManagerMetrics(ISchemaEngineStatistics schemaEngineStatistics) {
+ this.schemaEngineStatistics = schemaEngineStatistics;
}
@Override
@@ -44,25 +40,25 @@ public class SchemaResourceManagerMetrics implements IMetricSet {
metricService.createAutoGauge(
Metric.QUANTITY.toString(),
MetricLevel.CORE,
- schemaStatisticsManager,
- SchemaStatisticsManager::getTotalSeriesNumber,
+ schemaEngineStatistics,
+ ISchemaEngineStatistics::getTotalSeriesNumber,
Tag.NAME.toString(),
"timeSeries");
metricService.createAutoGauge(
Metric.MEM.toString(),
MetricLevel.IMPORTANT,
- memoryStatistics,
- MemoryStatistics::getMemoryUsage,
+ schemaEngineStatistics,
+ ISchemaEngineStatistics::getMemoryUsage,
Tag.NAME.toString(),
"schema_region_total_usage");
metricService.createAutoGauge(
Metric.MEM.toString(),
MetricLevel.IMPORTANT,
- memoryStatistics,
- memoryStatistics ->
- memoryStatistics.getMemoryCapacity() - memoryStatistics.getMemoryUsage(),
+ schemaEngineStatistics,
+ schemaEngineStatistics ->
+ schemaEngineStatistics.getMemoryCapacity() - schemaEngineStatistics.getMemoryUsage(),
Tag.NAME.toString(),
"schema_region_total_remaining");
}
@@ -90,11 +86,11 @@ public class SchemaResourceManagerMetrics implements IMetricSet {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SchemaResourceManagerMetrics that = (SchemaResourceManagerMetrics) o;
- return Objects.equals(schemaStatisticsManager, that.schemaStatisticsManager);
+ return Objects.equals(schemaEngineStatistics, that.schemaEngineStatistics);
}
@Override
public int hashCode() {
- return Objects.hash(schemaStatisticsManager);
+ return Objects.hash(schemaEngineStatistics);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
deleted file mode 100644
index 6a0b65143e..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.metadata.rescon;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class SchemaStatisticsManager {
-
- private final AtomicLong totalSeriesNumber = new AtomicLong();
-
- private static class SchemaStatisticsHolder {
-
- private SchemaStatisticsHolder() {
- // allowed to do nothing
- }
-
- private static final SchemaStatisticsManager INSTANCE = new SchemaStatisticsManager();
- }
-
- /** we should not use this function in other place, but only in IoTDB class */
- public static SchemaStatisticsManager getInstance() {
- return SchemaStatisticsHolder.INSTANCE;
- }
-
- public long getTotalSeriesNumber() {
- return totalSeriesNumber.get();
- }
-
- public void addTimeseries(long addedNum) {
- totalSeriesNumber.addAndGet(addedNum);
- }
-
- public void deleteTimeseries(long deletedNum) {
- totalSeriesNumber.addAndGet(-deletedNum);
- }
-
- public void clear() {
- this.totalSeriesNumber.getAndSet(0);
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index 91b6fff7be..82c918f65b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowDevicesPlan;
import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowNodesPlan;
import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowTimeSeriesPlan;
@@ -37,6 +38,7 @@ import org.apache.iotdb.db.metadata.query.info.IDeviceSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
+import org.apache.iotdb.db.metadata.rescon.ISchemaRegionStatistics;
import org.apache.iotdb.db.metadata.template.Template;
import java.io.File;
@@ -73,6 +75,9 @@ public interface ISchemaRegion {
void clear();
void forceMlog();
+
+ @TestOnly
+ ISchemaRegionStatistics getSchemaRegionStatistics();
// endregion
// region Interfaces for schema region Info query and operation
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index 2f60e6e1d4..a642313296 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -142,7 +142,7 @@ public class SchemaEngine {
* Scan the database and schema region directories to recover schema regions and return the
* collected local schema partition info for localSchemaPartitionTable recovery.
*/
- private Map<PartialPath, List<SchemaRegionId>> initSchemaRegion() throws MetadataException {
+ private Map<PartialPath, List<SchemaRegionId>> initSchemaRegion() {
Map<PartialPath, List<SchemaRegionId>> partitionTable = new HashMap<>();
File schemaDir = new File(config.getSchemaDir());
@@ -223,14 +223,16 @@ public class SchemaEngine {
}
public void clear() {
+ // clearSchemaResource will shut down release and flush task in Schema_File mode, which must be
+ // down before clear schema region
SchemaResourceManager.clearSchemaResource();
-
if (timedForceMLogThread != null) {
timedForceMLogThread.shutdown();
timedForceMLogThread = null;
}
if (schemaRegionMap != null) {
+ // SchemaEngineStatistics will be clear after clear all schema region
for (ISchemaRegion schemaRegion : schemaRegionMap.values()) {
schemaRegion.clear();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 5296c1dc92..7c57b5febd 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -66,8 +66,7 @@ import org.apache.iotdb.db.metadata.query.info.IDeviceSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
-import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
+import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
import org.apache.iotdb.db.metadata.tag.TagManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.utils.SchemaUtils;
@@ -136,8 +135,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
private boolean usingMLog = true;
private SchemaLogWriter<ISchemaRegionPlan> logWriter;
- private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance();
- private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
+ private final MemSchemaRegionStatistics regionStatistics;
private MTreeBelowSGMemoryImpl mtree;
private TagManager tagManager;
@@ -171,6 +169,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
}
this.seriesNumerMonitor = seriesNumerMonitor;
+ this.regionStatistics = new MemSchemaRegionStatistics(schemaRegionId.getId());
init();
}
@@ -191,7 +190,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
tagManager = new TagManager(schemaRegionDirPath);
mtree =
new MTreeBelowSGMemoryImpl(
- new PartialPath(storageGroupFullPath), tagManager::readTags, schemaRegionId.getId());
+ new PartialPath(storageGroupFullPath), tagManager::readTags, regionStatistics);
if (!(config.isClusterMode()
&& config
@@ -273,6 +272,11 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
}
}
+ @Override
+ public MemSchemaRegionStatistics getSchemaRegionStatistics() {
+ return regionStatistics;
+ }
+
/**
* Init from metadata log file.
*
@@ -348,6 +352,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
if (this.mtree != null) {
this.mtree.clear();
}
+ this.regionStatistics.clear();
if (logWriter != null) {
logWriter.close();
logWriter = null;
@@ -378,8 +383,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
@Override
public synchronized void deleteSchemaRegion() throws MetadataException {
// collect all the LeafMNode in this schema region
- long seriesCount = mtree.countAllMeasurement();
- schemaStatisticsManager.deleteTimeseries(seriesCount);
+ long seriesCount = regionStatistics.getSeriesNumber();
if (seriesNumerMonitor != null) {
seriesNumerMonitor.deleteTimeSeries((int) seriesCount);
}
@@ -452,8 +456,9 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
MTreeBelowSGMemoryImpl.loadFromSnapshot(
latestSnapshotRootDir,
storageGroupFullPath,
- schemaRegionId.getId(),
+ regionStatistics,
measurementMNode -> {
+ regionStatistics.addTimeseries(1L);
if (measurementMNode.getOffset() == -1) {
return;
}
@@ -511,7 +516,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
@Override
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void createTimeseries(ICreateTimeSeriesPlan plan, long offset) throws MetadataException {
- if (!memoryStatistics.isAllowToCreateNewSeries()) {
+ if (!regionStatistics.isAllowToCreateNewSeries()) {
throw new SeriesOverflowException();
}
@@ -545,7 +550,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
}
// update statistics and schemaDataTypeNumMap
- schemaStatisticsManager.addTimeseries(1L);
+ regionStatistics.addTimeseries(1L);
// update tag index
if (offset != -1 && isRecovering) {
@@ -591,7 +596,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
@Override
public void createAlignedTimeSeries(ICreateAlignedTimeSeriesPlan plan) throws MetadataException {
int seriesCount = plan.getMeasurements().size();
- if (!memoryStatistics.isAllowToCreateNewSeries()) {
+ if (!regionStatistics.isAllowToCreateNewSeries()) {
throw new SeriesOverflowException();
}
@@ -631,7 +636,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
}
// update statistics and schemaDataTypeNumMap
- schemaStatisticsManager.addTimeseries(seriesCount);
+ regionStatistics.addTimeseries(seriesCount);
List<Long> tagOffsets = plan.getTagOffsets();
for (int i = 0; i < measurements.size(); i++) {
@@ -769,7 +774,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
IMeasurementMNode measurementMNode = pair.right;
removeFromTagInvertedIndex(measurementMNode);
- schemaStatisticsManager.deleteTimeseries(1L);
+ regionStatistics.deleteTimeseries(1L);
if (seriesNumerMonitor != null) {
seriesNumerMonitor.deleteTimeSeries(1);
}
@@ -793,7 +798,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
removeFromTagInvertedIndex(measurementMNode);
PartialPath storageGroupPath = pair.left;
- schemaStatisticsManager.deleteTimeseries(1L);
+ regionStatistics.deleteTimeseries(1L);
if (seriesNumerMonitor != null) {
seriesNumerMonitor.deleteTimeSeries(1);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 48c4511e65..3fd9f63960 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -67,8 +67,8 @@ import org.apache.iotdb.db.metadata.query.info.IDeviceSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
-import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
-import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager;
+import org.apache.iotdb.db.metadata.rescon.CachedSchemaRegionStatistics;
+import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
import org.apache.iotdb.db.metadata.tag.TagManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.utils.SchemaUtils;
@@ -140,9 +140,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
private SchemaLogWriter<ISchemaRegionPlan> logWriter;
private MLogDescriptionWriter logDescriptionWriter;
- private final SchemaStatisticsManager schemaStatisticsManager =
- SchemaStatisticsManager.getInstance();
- private final MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();
+ private final CachedSchemaRegionStatistics regionStatistics;
private MTreeBelowSGCachedImpl mtree;
private TagManager tagManager;
@@ -164,6 +162,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
schemaRegionDirPath = storageGroupDirPath + File.separator + schemaRegionId.getId();
this.seriesNumerMonitor = seriesNumerMonitor;
+ this.regionStatistics = new CachedSchemaRegionStatistics(schemaRegionId.getId());
init();
}
@@ -199,7 +198,8 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
schemaRegionId);
}
},
- schemaRegionId.getId());
+ schemaRegionId.getId(),
+ regionStatistics);
if (!(config.isClusterMode()
&& config
@@ -297,6 +297,11 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
}
}
+ @Override
+ public MemSchemaRegionStatistics getSchemaRegionStatistics() {
+ return regionStatistics;
+ }
+
/** Init from metadata log file. */
@SuppressWarnings("squid:S3776")
private void initFromLog() throws IOException {
@@ -378,6 +383,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
if (this.mtree != null) {
this.mtree.clear();
}
+ this.regionStatistics.clear();
if (logWriter != null) {
logWriter.close();
logWriter = null;
@@ -413,8 +419,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
@Override
public synchronized void deleteSchemaRegion() throws MetadataException {
// collect all the LeafMNode in this schema region
- long seriesCount = mtree.countAllMeasurement();
- schemaStatisticsManager.deleteTimeseries(seriesCount);
+ long seriesCount = regionStatistics.getSeriesNumber();
if (seriesNumerMonitor != null) {
seriesNumerMonitor.deleteTimeSeries((int) seriesCount);
}
@@ -487,7 +492,9 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
latestSnapshotRootDir,
storageGroupFullPath,
schemaRegionId.getId(),
+ regionStatistics,
measurementMNode -> {
+ regionStatistics.addTimeseries(1L);
if (measurementMNode.getOffset() == -1) {
return;
}
@@ -568,9 +575,9 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
@Override
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void createTimeseries(ICreateTimeSeriesPlan plan, long offset) throws MetadataException {
- if (!memoryStatistics.isAllowToCreateNewSeries()) {
+ if (!regionStatistics.isAllowToCreateNewSeries()) {
CacheMemoryManager.getInstance().waitIfReleasing();
- if (!memoryStatistics.isAllowToCreateNewSeries()) {
+ if (!regionStatistics.isAllowToCreateNewSeries()) {
logger.warn("Series overflow when creating: [{}]", plan.getPath().getFullPath());
throw new SeriesOverflowException();
}
@@ -607,7 +614,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
try {
// update statistics and schemaDataTypeNumMap
- schemaStatisticsManager.addTimeseries(1L);
+ regionStatistics.addTimeseries(1L);
// update tag index
if (offset != -1 && isRecovering) {
@@ -691,9 +698,9 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
@Override
public void createAlignedTimeSeries(ICreateAlignedTimeSeriesPlan plan) throws MetadataException {
int seriesCount = plan.getMeasurements().size();
- if (!memoryStatistics.isAllowToCreateNewSeries()) {
+ if (!regionStatistics.isAllowToCreateNewSeries()) {
CacheMemoryManager.getInstance().waitIfReleasing();
- if (!memoryStatistics.isAllowToCreateNewSeries()) {
+ if (!regionStatistics.isAllowToCreateNewSeries()) {
throw new SeriesOverflowException();
}
}
@@ -735,7 +742,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
try {
// update statistics and schemaDataTypeNumMap
- schemaStatisticsManager.addTimeseries(seriesCount);
+ regionStatistics.addTimeseries(seriesCount);
List<Long> tagOffsets = plan.getTagOffsets();
for (int i = 0; i < measurements.size(); i++) {
@@ -876,7 +883,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
IMeasurementMNode measurementMNode = pair.right;
removeFromTagInvertedIndex(measurementMNode);
- schemaStatisticsManager.deleteTimeseries(1L);
+ regionStatistics.deleteTimeseries(1L);
if (seriesNumerMonitor != null) {
seriesNumerMonitor.deleteTimeSeries(1);
}
@@ -895,7 +902,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
removeFromTagInvertedIndex(measurementMNode);
PartialPath storageGroupPath = pair.left;
- schemaStatisticsManager.deleteTimeseries(1L);
+ regionStatistics.deleteTimeseries(1L);
if (seriesNumerMonitor != null) {
seriesNumerMonitor.deleteTimeSeries(1);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGTest.java
index aa84464871..b3bfda3848 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGTest.java
@@ -25,6 +25,9 @@ import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.mtree.store.disk.cache.CacheMemoryManager;
+import org.apache.iotdb.db.metadata.rescon.CachedSchemaRegionStatistics;
+import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngineMode;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -66,6 +69,7 @@ public abstract class MTreeBelowSGTest {
@After
public void tearDown() throws Exception {
+ CacheMemoryManager.getInstance().clear();
root.clear();
root = null;
for (IMTreeBelowSG mtree : usedMTree) {
@@ -93,9 +97,10 @@ public abstract class MTreeBelowSGTest {
node -> {
// do nothing
},
- 0);
+ 0,
+ new CachedSchemaRegionStatistics(0));
} else {
- mtree = new MTreeBelowSGMemoryImpl(path, null, 0);
+ mtree = new MTreeBelowSGMemoryImpl(path, null, new MemSchemaRegionStatistics(0));
}
usedMTree.add(mtree);
return mtree;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
new file mode 100644
index 0000000000..867fa497d7
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaRegion;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.metadata.mnode.EntityMNode;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.InternalMNode;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupEntityMNode;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.metadata.mnode.estimator.BasicMNodSizeEstimator;
+import org.apache.iotdb.db.metadata.mnode.estimator.IMNodeSizeEstimator;
+import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.CachedMNodeSizeEstimator;
+import org.apache.iotdb.db.metadata.rescon.CachedSchemaEngineStatistics;
+import org.apache.iotdb.db.metadata.rescon.CachedSchemaRegionStatistics;
+import org.apache.iotdb.db.metadata.rescon.ISchemaEngineStatistics;
+import org.apache.iotdb.db.metadata.rescon.SchemaEngineStatisticsHolder;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class SchemaStatisticsTest extends AbstractSchemaRegionTest {
+
+ public SchemaStatisticsTest(AbstractSchemaRegionTest.SchemaRegionTestParams testParams) {
+ super(testParams);
+ }
+
+ @Test
+ @Ignore
+ public void testMemoryStatistics() throws Exception {
+ ISchemaRegion schemaRegion1 = getSchemaRegion("root.sg1", 0);
+ ISchemaRegion schemaRegion2 = getSchemaRegion("root.sg2", 1);
+ ISchemaEngineStatistics engineStatistics =
+ SchemaEngineStatisticsHolder.getSchemaEngineStatistics();
+
+ SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ schemaRegion1, Arrays.asList("root.sg1.d0", "root.sg1.d1.s1", "root.sg1.d1.s2.t1"));
+ SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ schemaRegion2, Arrays.asList("root.sg2.d1.s3", "root.sg2.d2.s1", "root.sg2.d2.s2"));
+ PathPatternTree patternTree = new PathPatternTree();
+ patternTree.appendPathPattern(new PartialPath("root.**.s1"));
+ patternTree.constructTree();
+ Assert.assertTrue(schemaRegion1.constructSchemaBlackList(patternTree) >= 1);
+ Assert.assertTrue(schemaRegion2.constructSchemaBlackList(patternTree) >= 1);
+ schemaRegion1.deleteTimeseriesInBlackList(patternTree);
+ schemaRegion2.deleteTimeseriesInBlackList(patternTree);
+
+ if (testParams.getTestModeName().equals("SchemaFile-PartialMemory")
+ || testParams.getTestModeName().equals("SchemaFile-NonMemory")) {
+ // wait release and flush task
+ Thread.sleep(1000);
+ IMNodeSizeEstimator estimator = new CachedMNodeSizeEstimator();
+ // schemaRegion1
+ IMNode sg1 =
+ new StorageGroupEntityMNode(
+ null, "sg1", CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs());
+ sg1.setFullPath("root.sg1");
+ long size1 = estimator.estimateSize(sg1);
+ Assert.assertEquals(size1, schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage());
+ // schemaRegion2
+ IMNode sg2 =
+ new StorageGroupMNode(
+ null, "sg2", CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs());
+ sg2.setFullPath("root.sg2");
+ long size2 = estimator.estimateSize(sg2);
+ Assert.assertEquals(size2, schemaRegion2.getSchemaRegionStatistics().getRegionMemoryUsage());
+ Assert.assertEquals(size1 + size2, engineStatistics.getMemoryUsage());
+ } else {
+ IMNodeSizeEstimator estimator =
+ testParams.getSchemaEngineMode().equals("Memory")
+ ? new BasicMNodSizeEstimator()
+ : new CachedMNodeSizeEstimator();
+ // schemaRegion1
+ IMNode sg1 =
+ new StorageGroupEntityMNode(
+ null, "sg1", CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs());
+ sg1.setFullPath("root.sg1");
+ long size1 = estimator.estimateSize(sg1);
+ IMNode tmp =
+ new MeasurementMNode(
+ sg1,
+ "d0",
+ new MeasurementSchema(
+ "d0", TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY),
+ null);
+ size1 += estimator.estimateSize(tmp);
+ tmp = new InternalMNode(sg1, "d1");
+ size1 += estimator.estimateSize(tmp);
+ tmp = new EntityMNode(tmp, "s2");
+ size1 += estimator.estimateSize(tmp);
+ size1 +=
+ estimator.estimateSize(
+ new MeasurementMNode(
+ tmp,
+ "t1",
+ new MeasurementSchema(
+ "t1", TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY),
+ null));
+ Assert.assertEquals(size1, schemaRegion1.getSchemaRegionStatistics().getRegionMemoryUsage());
+ // schemaRegion2
+ IMNode sg2 =
+ new StorageGroupMNode(
+ null, "sg2", CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs());
+ sg2.setFullPath("root.sg2");
+ long size2 = estimator.estimateSize(sg2);
+ tmp = new EntityMNode(sg2, "d1");
+ size2 += estimator.estimateSize(tmp);
+ size2 +=
+ estimator.estimateSize(
+ new MeasurementMNode(
+ tmp,
+ "s3",
+ new MeasurementSchema(
+ "s3", TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY),
+ null));
+ tmp = new EntityMNode(sg2, "d2");
+ size2 += estimator.estimateSize(tmp);
+ size2 +=
+ estimator.estimateSize(
+ new MeasurementMNode(
+ tmp,
+ "s2",
+ new MeasurementSchema(
+ "s2", TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY),
+ null));
+ Assert.assertEquals(size2, schemaRegion2.getSchemaRegionStatistics().getRegionMemoryUsage());
+ Assert.assertEquals(size1 + size2, engineStatistics.getMemoryUsage());
+ }
+ Assert.assertEquals(0, schemaRegion1.getSchemaRegionStatistics().getSchemaRegionId());
+ Assert.assertEquals(1, schemaRegion2.getSchemaRegionStatistics().getSchemaRegionId());
+ checkSchemaFileStatistics(engineStatistics);
+ }
+
+ private void checkSchemaFileStatistics(ISchemaEngineStatistics engineStatistics) {
+ if (engineStatistics instanceof CachedSchemaEngineStatistics) {
+ CachedSchemaEngineStatistics cachedEngineStatistics =
+ (CachedSchemaEngineStatistics) engineStatistics;
+ Assert.assertEquals(
+ cachedEngineStatistics.getMemoryUsage(),
+ cachedEngineStatistics.getPinnedMemorySize()
+ + cachedEngineStatistics.getCachedMemorySize());
+ }
+ }
+
+ @Test
+ public void testSeriesNumStatistics() throws Exception {
+ ISchemaRegion schemaRegion1 = getSchemaRegion("root.sg1", 0);
+ ISchemaRegion schemaRegion2 = getSchemaRegion("root.sg2", 1);
+ ISchemaEngineStatistics engineStatistics =
+ SchemaEngineStatisticsHolder.getSchemaEngineStatistics();
+
+ SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ schemaRegion1, Arrays.asList("root.sg1.d0", "root.sg1.d1.s1", "root.sg1.d1.s2.t1"));
+ SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ schemaRegion2, Arrays.asList("root.sg2.d1.s3", "root.sg2.d2.s1", "root.sg2.d2.s2"));
+ PathPatternTree patternTree = new PathPatternTree();
+ patternTree.appendPathPattern(new PartialPath("root.**.s1"));
+ patternTree.constructTree();
+ Assert.assertTrue(schemaRegion1.constructSchemaBlackList(patternTree) >= 1);
+ Assert.assertTrue(schemaRegion2.constructSchemaBlackList(patternTree) >= 1);
+ schemaRegion1.deleteTimeseriesInBlackList(patternTree);
+ schemaRegion2.deleteTimeseriesInBlackList(patternTree);
+
+ // check series number
+ Assert.assertEquals(2, schemaRegion1.getSchemaRegionStatistics().getSeriesNumber());
+ Assert.assertEquals(2, schemaRegion2.getSchemaRegionStatistics().getSeriesNumber());
+ Assert.assertEquals(4, engineStatistics.getTotalSeriesNumber());
+ }
+
+ @Test
+ @Ignore
+ public void testSchemaFileNodeStatistics() throws Exception {
+ if (testParams.getSchemaEngineMode().equals("Schema_File")) {
+ ISchemaRegion schemaRegion1 = getSchemaRegion("root.sg1", 0);
+ ISchemaRegion schemaRegion2 = getSchemaRegion("root.sg2", 1);
+ CachedSchemaEngineStatistics engineStatistics =
+ SchemaEngineStatisticsHolder.getSchemaEngineStatistics()
+ .getAsCachedSchemaEngineStatistics();
+ SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ schemaRegion1, Arrays.asList("root.sg1.d0", "root.sg1.d1.s1", "root.sg1.d1.s2.t1"));
+ SchemaRegionTestUtil.createSimpleTimeseriesByList(
+ schemaRegion2, Arrays.asList("root.sg2.d1.s3", "root.sg2.d2.s1", "root.sg2.d2.s2"));
+ PathPatternTree patternTree = new PathPatternTree();
+ patternTree.appendPathPattern(new PartialPath("root.**.s1"));
+ patternTree.constructTree();
+ Assert.assertTrue(schemaRegion1.constructSchemaBlackList(patternTree) >= 1);
+ Assert.assertTrue(schemaRegion2.constructSchemaBlackList(patternTree) >= 1);
+ schemaRegion1.deleteTimeseriesInBlackList(patternTree);
+ schemaRegion2.deleteTimeseriesInBlackList(patternTree);
+
+ Thread.sleep(1000);
+ CachedSchemaRegionStatistics cachedRegionStatistics1 =
+ schemaRegion1.getSchemaRegionStatistics().getAsCachedSchemaRegionStatistics();
+ CachedSchemaRegionStatistics cachedRegionStatistics2 =
+ schemaRegion2.getSchemaRegionStatistics().getAsCachedSchemaRegionStatistics();
+ // check correctness of statistics
+ if (testParams.getCachedMNodeSize() > 3) {
+ Assert.assertEquals(1, cachedRegionStatistics1.getPinnedMNodeNum());
+ Assert.assertEquals(4, cachedRegionStatistics1.getCachedMNodeNum());
+ Assert.assertEquals(1, cachedRegionStatistics2.getPinnedMNodeNum());
+ Assert.assertEquals(4, cachedRegionStatistics2.getCachedMNodeNum());
+ } else {
+ Assert.assertEquals(1, cachedRegionStatistics1.getPinnedMNodeNum());
+ Assert.assertEquals(0, cachedRegionStatistics1.getCachedMNodeNum());
+ Assert.assertEquals(1, cachedRegionStatistics2.getPinnedMNodeNum());
+ Assert.assertEquals(0, cachedRegionStatistics2.getCachedMNodeNum());
+ }
+ // check consistence between region and engine
+ Assert.assertEquals(
+ cachedRegionStatistics1.getPinnedMNodeNum() + cachedRegionStatistics2.getPinnedMNodeNum(),
+ engineStatistics.getPinnedMNodeNum());
+ Assert.assertEquals(
+ cachedRegionStatistics1.getCachedMNodeNum() + cachedRegionStatistics2.getCachedMNodeNum(),
+ engineStatistics.getCachedMNodeNum());
+ Assert.assertEquals(
+ cachedRegionStatistics1.getPinnedMemorySize()
+ + cachedRegionStatistics2.getPinnedMemorySize(),
+ engineStatistics.getPinnedMemorySize());
+ Assert.assertEquals(
+ cachedRegionStatistics1.getCachedMemorySize()
+ + cachedRegionStatistics2.getCachedMemorySize(),
+ engineStatistics.getCachedMemorySize());
+ }
+ }
+}