You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/10/14 02:15:51 UTC
[iotdb] branch master updated: [IOTDB-4022] Adjust the storage_group_report_threshold according to memory for writing (#6920)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new aaa4147cd8 [IOTDB-4022] Adjust the storage_group_report_threshold according to memory for writing (#6920)
aaa4147cd8 is described below
commit aaa4147cd84c69cde4b7b033368af2632e0d6cea
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Fri Oct 14 10:15:45 2022 +0800
[IOTDB-4022] Adjust the storage_group_report_threshold according to memory for writing (#6920)
---
docs/UserGuide/Reference/DataNode-Config-Manual.md | 9 +++++
.../UserGuide/Reference/DataNode-Config-Manual.md | 10 ++---
.../iotdb/commons/concurrent/ThreadName.java | 1 +
.../resources/conf/iotdb-datanode.properties | 6 +--
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 16 +++++---
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 8 ++--
.../iotdb/db/engine/storagegroup/DataRegion.java | 22 +++++------
.../{StorageGroupInfo.java => DataRegionInfo.java} | 19 +++++----
.../db/engine/storagegroup/TsFileProcessor.java | 38 +++++++++---------
.../engine/storagegroup/TsFileProcessorInfo.java | 16 ++++----
.../org/apache/iotdb/db/rescon/SystemInfo.java | 46 +++++++++++-----------
.../engine/storagegroup/TsFileProcessorTest.java | 4 +-
.../engine/storagegroup/TsFileProcessorV2Test.java | 4 +-
13 files changed, 106 insertions(+), 93 deletions(-)
diff --git a/docs/UserGuide/Reference/DataNode-Config-Manual.md b/docs/UserGuide/Reference/DataNode-Config-Manual.md
index 9cec4f491d..ef9e034d52 100644
--- a/docs/UserGuide/Reference/DataNode-Config-Manual.md
+++ b/docs/UserGuide/Reference/DataNode-Config-Manual.md
@@ -460,6 +460,15 @@ The permission definitions are in ${IOTDB\_CONF}/conf/jmx.access.
|Default| 1073741824 |
|Effective| when enable\_mem\_control is false & After restarting system |
+* write\_memory\_variation\_report\_proportion
+
+| Name | write\_memory\_variation\_report\_proportion |
+| :---------: | :----------------------------------------------------------- |
+| Description | if memory cost of data region increased more than proportion of allocated memory for write, report to system |
+| Type | Double |
+| Default | 0.001 |
+| Effective | After restarting system |
+
* enable\_timed\_flush\_seq\_memtable
|Name| enable\_timed\_flush\_seq\_memtable |
diff --git a/docs/zh/UserGuide/Reference/DataNode-Config-Manual.md b/docs/zh/UserGuide/Reference/DataNode-Config-Manual.md
index 9c6b502910..6aade22032 100644
--- a/docs/zh/UserGuide/Reference/DataNode-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/DataNode-Config-Manual.md
@@ -799,13 +799,13 @@ IoTDB DataNode 与 Standalone 模式共用一套配置文件,均位于 IoTDB
|默认值| 0.8 |
|改后生效方式|重启服务生效|
-* storage\_group\_report\_threshold
+* write\_memory\_variation\_report\_proportion
-|名字| storage\_group\_report\_threshold |
+|名字| write\_memory\_variation\_report\_proportion |
|:---:|:---|
-|描述| 如果存储组的内存(以字节byte为单位)增加超过此阈值,则向系统报告。默认值为16MB |
-|类型| Int32 |
-|默认值| 16777216 |
+|描述| 如果 DataRegion 的内存增加超过写入可用内存的一定比例,则向系统报告。默认值为0.001 |
+|类型| Double |
+|默认值| 0.001 |
|改后生效方式|重启服务生效|
* max\_deduplicated\_path\_num
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 79ea7593cf..03610f68a9 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -34,6 +34,7 @@ public enum ThreadName {
DATA_REGION_RECOVER_SERVICE("Data-Region-Recover"),
FLUSH_SERVICE("Flush"),
FLUSH_SUB_TASK_SERVICE("Flush-SubTask"),
+ FLUSH_TASK_SUBMIT("FlushTask-Submit-Pool"),
COMPACTION_SERVICE("Compaction"),
COMPACTION_SUB_SERVICE("Sub-Compaction"),
COMPACTION_SCHEDULE("Compaction_Schedule"),
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 32d732d310..73150bd4be 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -485,9 +485,9 @@ timestamp_precision=ms
# Datatype: double
# reject_proportion=0.8
-# If memory (in byte) of storage group increased more than this threshold, report to system. The default value is 16MB
-# Datatype: long
-# storage_group_report_threshold=16777216
+# If memory cost of data region increased more than proportion of allocated memory for write, report to system. The default value is 0.001
+# Datatype: double
+# write_memory_variation_report_proportion=0.001
# allowed max numbers of deduplicated path in one query
# it's just an advised value, the real limitation will be the smaller one between this and the one we calculated
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 15c9531e79..4e89c78d0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -155,8 +155,12 @@ public class IoTDBConfig {
/** The proportion of write memory for compaction */
private double compactionProportion = 0.2;
- /** If storage group increased more than this threshold, report to system. Unit: byte */
- private long storageGroupSizeReportThreshold = 16 * 1024 * 1024L;
+ /**
+ * If memory cost of data region increased more than proportion of {@linkplain
+ * IoTDBConfig#getAllocateMemoryForStorageEngine()}*{@linkplain IoTDBConfig#getWriteProportion()},
+ * report to system.
+ */
+ private double writeMemoryVariationReportProportion = 0.001;
/** When inserting rejected, waiting period to check system again. Unit: millisecond */
private int checkPeriodWhenInsertBlocked = 50;
@@ -1780,12 +1784,12 @@ public class IoTDBConfig {
this.rejectProportion = rejectProportion;
}
- public long getStorageGroupSizeReportThreshold() {
- return storageGroupSizeReportThreshold;
+ public double getWriteMemoryVariationReportProportion() {
+ return writeMemoryVariationReportProportion;
}
- public void setStorageGroupSizeReportThreshold(long storageGroupSizeReportThreshold) {
- this.storageGroupSizeReportThreshold = storageGroupSizeReportThreshold;
+ public void setWriteMemoryVariationReportProportion(double writeMemoryVariationReportProportion) {
+ this.writeMemoryVariationReportProportion = writeMemoryVariationReportProportion;
}
public long getAllocateMemoryForStorageEngine() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 967d6ecc98..a2092c9cd5 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -294,11 +294,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"reject_proportion", Double.toString(conf.getRejectProportion()))));
- conf.setStorageGroupSizeReportThreshold(
- Long.parseLong(
+ conf.setWriteMemoryVariationReportProportion(
+ Double.parseDouble(
properties.getProperty(
- "storage_group_report_threshold",
- Long.toString(conf.getStorageGroupSizeReportThreshold()))));
+ "write_memory_variation_report_proportion",
+ Double.toString(conf.getWriteMemoryVariationReportProportion()))));
conf.setMetaDataCacheEnable(
Boolean.parseBoolean(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index bea53583f4..0df5037bbf 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -254,7 +254,7 @@ public class DataRegion {
*/
private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
/** storage group info for mem control */
- private StorageGroupInfo storageGroupInfo = new StorageGroupInfo(this);
+ private DataRegionInfo dataRegionInfo = new DataRegionInfo(this);
/** whether it's ready from recovery */
private boolean isReady = false;
/** close file listeners */
@@ -767,7 +767,7 @@ public class DataRegion {
TsFileProcessor tsFileProcessor =
new TsFileProcessor(
dataRegionId,
- storageGroupInfo,
+ dataRegionInfo,
tsFileResource,
this::closeUnsealedTsFileProcessorCallBack,
isSeq ? this::updateLatestFlushTimeCallback : this::unsequenceFlushCallback,
@@ -783,9 +783,9 @@ public class DataRegion {
tsFileProcessor.setTimeRangeId(timePartitionId);
writer.makeMetadataVisible();
if (enableMemControl) {
- TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo);
+ TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo);
tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
- this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
+ this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor);
// get chunkMetadata size
long chunkMetadataSize = 0;
for (Map<String, List<ChunkMetadata>> metaMap : writer.getMetadatasForQuery().values()) {
@@ -1550,7 +1550,7 @@ public class DataRegion {
new TsFileProcessor(
storageGroupName + FILE_NAME_SEPARATOR + dataRegionId,
fsFactory.getFileWithParent(filePath),
- storageGroupInfo,
+ dataRegionInfo,
this::closeUnsealedTsFileProcessorCallBack,
this::updateLatestFlushTimeCallback,
true);
@@ -1559,16 +1559,16 @@ public class DataRegion {
new TsFileProcessor(
storageGroupName + FILE_NAME_SEPARATOR + dataRegionId,
fsFactory.getFileWithParent(filePath),
- storageGroupInfo,
+ dataRegionInfo,
this::closeUnsealedTsFileProcessorCallBack,
this::unsequenceFlushCallback,
false);
}
if (enableMemControl) {
- TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo);
+ TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo);
tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
- this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
+ this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor);
}
tsFileProcessor.addCloseFileListeners(customCloseFileListeners);
@@ -3330,10 +3330,6 @@ public class DataRegion {
return storageGroupName + File.separator + dataRegionId;
}
- public StorageGroupInfo getStorageGroupInfo() {
- return storageGroupInfo;
- }
-
/**
* Check if the data of "tsFileResource" all exist locally by comparing planIndexes in the
* partition of "partitionNumber". This is available only when the IoTDB instances which generated
@@ -3822,7 +3818,7 @@ public class DataRegion {
}
public long getMemCost() {
- return storageGroupInfo.getMemCost();
+ return dataRegionInfo.getMemCost();
}
@TestOnly
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegionInfo.java
similarity index 80%
rename from server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
rename to server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegionInfo.java
index 8577c427e7..f01139166b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegionInfo.java
@@ -26,26 +26,29 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
/** The storageGroupInfo records the total memory cost of the Storage Group. */
-public class StorageGroupInfo {
+public class DataRegionInfo {
- private DataRegion dataRegion;
+ private final DataRegion dataRegion;
/**
* The total Storage group memory cost, including unsealed TsFileResource, ChunkMetadata, WAL,
* primitive arrays and TEXT values
*/
- private AtomicLong memoryCost;
+ private final AtomicLong memoryCost;
/** The threshold of reporting it's size to SystemInfo */
- private long storageGroupSizeReportThreshold =
- IoTDBDescriptor.getInstance().getConfig().getStorageGroupSizeReportThreshold();
+ private final long storageGroupSizeReportThreshold =
+ (long)
+ (IoTDBDescriptor.getInstance().getConfig().getWriteMemoryVariationReportProportion()
+ * IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForStorageEngine()
+ * IoTDBDescriptor.getInstance().getConfig().getWriteProportion());
- private AtomicLong lastReportedSize = new AtomicLong();
+ private final AtomicLong lastReportedSize = new AtomicLong();
/** A set of all unclosed TsFileProcessors in this SG */
- private List<TsFileProcessor> reportedTsps = new CopyOnWriteArrayList<>();
+ private final List<TsFileProcessor> reportedTsps = new CopyOnWriteArrayList<>();
- public StorageGroupInfo(DataRegion dataRegion) {
+ public DataRegionInfo(DataRegion dataRegion) {
this.dataRegion = dataRegion;
memoryCost = new AtomicLong();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 841055e460..eab39c9218 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -108,7 +108,7 @@ public class TsFileProcessor {
private final boolean enableMemControl = config.isEnableMemControl();
/** storage group info for mem control */
- private StorageGroupInfo storageGroupInfo;
+ private DataRegionInfo dataRegionInfo;
/** tsfile processor info for mem control */
private TsFileProcessorInfo tsFileProcessorInfo;
@@ -171,14 +171,14 @@ public class TsFileProcessor {
TsFileProcessor(
String storageGroupName,
File tsfile,
- StorageGroupInfo storageGroupInfo,
+ DataRegionInfo dataRegionInfo,
CloseFileListener closeTsFileCallback,
UpdateEndTimeCallBack updateLatestFlushTimeCallback,
boolean sequence)
throws IOException {
this.storageGroupName = storageGroupName;
this.tsFileResource = new TsFileResource(tsfile, this);
- this.storageGroupInfo = storageGroupInfo;
+ this.dataRegionInfo = dataRegionInfo;
this.writer = new RestorableTsFileIOWriter(tsfile);
this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
this.sequence = sequence;
@@ -192,7 +192,7 @@ public class TsFileProcessor {
@SuppressWarnings("java:S107") // ignore number of arguments
public TsFileProcessor(
String storageGroupName,
- StorageGroupInfo storageGroupInfo,
+ DataRegionInfo dataRegionInfo,
TsFileResource tsFileResource,
CloseFileListener closeUnsealedTsFileProcessor,
UpdateEndTimeCallBack updateLatestFlushTimeCallback,
@@ -200,7 +200,7 @@ public class TsFileProcessor {
RestorableTsFileIOWriter writer) {
this.storageGroupName = storageGroupName;
this.tsFileResource = tsFileResource;
- this.storageGroupInfo = storageGroupInfo;
+ this.dataRegionInfo = dataRegionInfo;
this.writer = writer;
this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
this.sequence = sequence;
@@ -783,17 +783,17 @@ public class TsFileProcessor {
long memTableIncrement, long chunkMetadataIncrement, long textDataIncrement)
throws WriteProcessException {
memTableIncrement += textDataIncrement;
- storageGroupInfo.addStorageGroupMemCost(memTableIncrement);
+ dataRegionInfo.addStorageGroupMemCost(memTableIncrement);
tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement);
- if (storageGroupInfo.needToReportToSystem()) {
+ if (dataRegionInfo.needToReportToSystem()) {
try {
- if (!SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo, this)) {
+ if (!SystemInfo.getInstance().reportStorageGroupStatus(dataRegionInfo, this)) {
StorageEngine.blockInsertionIfReject(this);
}
} catch (WriteProcessRejectException e) {
- storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
+ dataRegionInfo.releaseStorageGroupMemCost(memTableIncrement);
tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
- SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+ SystemInfo.getInstance().resetStorageGroupStatus(dataRegionInfo);
throw e;
}
}
@@ -807,9 +807,9 @@ public class TsFileProcessor {
long chunkMetadataIncrement = memIncrements[2];
memTableIncrement += textDataIncrement;
- storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
+ dataRegionInfo.releaseStorageGroupMemCost(memTableIncrement);
tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
- SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+ SystemInfo.getInstance().resetStorageGroupStatus(dataRegionInfo);
workMemTable.releaseTVListRamCost(memTableIncrement);
workMemTable.releaseTextDataSize(textDataIncrement);
}
@@ -839,7 +839,7 @@ public class TsFileProcessor {
}
for (ISyncManager syncManager :
SyncService.getInstance()
- .getOrCreateSyncManager(storageGroupInfo.getDataRegion().getDataRegionId())) {
+ .getOrCreateSyncManager(dataRegionInfo.getDataRegion().getDataRegionId())) {
syncManager.syncRealTimeDeletion(deletion);
}
} finally {
@@ -998,7 +998,7 @@ public class TsFileProcessor {
addAMemtableIntoFlushingList(tmpMemTable);
for (ISyncManager syncManager :
SyncService.getInstance()
- .getOrCreateSyncManager(storageGroupInfo.getDataRegion().getDataRegionId())) {
+ .getOrCreateSyncManager(dataRegionInfo.getDataRegion().getDataRegionId())) {
syncManager.syncRealTimeTsFile(tsFileResource.getTsFile());
}
logger.info("Memtable {} has been added to flushing list", tmpMemTable);
@@ -1168,7 +1168,7 @@ public class TsFileProcessor {
MemTableManager.getInstance().decreaseMemtableNumber();
if (enableMemControl) {
// reset the mem cost in StorageGroupProcessorInfo
- storageGroupInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
+ dataRegionInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
if (logger.isDebugEnabled()) {
logger.debug(
"[mem control] {}: {} flush finished, try to reset system memcost, "
@@ -1178,7 +1178,7 @@ public class TsFileProcessor {
flushingMemTables.size());
}
// report to System
- SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+ SystemInfo.getInstance().resetStorageGroupStatus(dataRegionInfo);
SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost());
}
if (logger.isDebugEnabled()) {
@@ -1430,7 +1430,7 @@ public class TsFileProcessor {
tsFileResource.serialize();
for (ISyncManager syncManager :
SyncService.getInstance()
- .getOrCreateSyncManager(storageGroupInfo.getDataRegion().getDataRegionId())) {
+ .getOrCreateSyncManager(dataRegionInfo.getDataRegion().getDataRegionId())) {
syncManager.syncRealTimeResource(tsFileResource.getTsFile());
}
logger.info("Ended file {}", tsFileResource);
@@ -1443,7 +1443,7 @@ public class TsFileProcessor {
if (enableMemControl) {
tsFileProcessorInfo.clear();
- storageGroupInfo.closeTsFileProcessorAndReportToSystem(this);
+ dataRegionInfo.closeTsFileProcessorAndReportToSystem(this);
}
if (logger.isInfoEnabled()) {
long closeEndTime = System.currentTimeMillis();
@@ -1637,7 +1637,7 @@ public class TsFileProcessor {
}
public void submitAFlushTask() {
- this.storageGroupInfo.getDataRegion().submitAFlushTaskWhenShouldFlush(this);
+ this.dataRegionInfo.getDataRegion().submitAFlushTaskWhenShouldFlush(this);
}
public boolean alreadyMarkedClosing() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java
index 7d0b419a39..502f379641 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java
@@ -24,37 +24,37 @@ import org.apache.iotdb.db.service.metrics.MetricService;
public class TsFileProcessorInfo {
/** Once tspInfo updated, report to storageGroupInfo that this TSP belongs to. */
- private StorageGroupInfo storageGroupInfo;
+ private DataRegionInfo dataRegionInfo;
/** memory occupation of unsealed TsFileResource, ChunkMetadata, WAL */
private long memCost;
- public TsFileProcessorInfo(StorageGroupInfo storageGroupInfo) {
- this.storageGroupInfo = storageGroupInfo;
+ public TsFileProcessorInfo(DataRegionInfo dataRegionInfo) {
+ this.dataRegionInfo = dataRegionInfo;
this.memCost = 0L;
- if (null != storageGroupInfo.getDataRegion()) {
+ if (null != dataRegionInfo.getDataRegion()) {
MetricService.getInstance()
.addMetricSet(
new TsFileProcessorInfoMetrics(
- storageGroupInfo.getDataRegion().getStorageGroupName(), memCost));
+ dataRegionInfo.getDataRegion().getStorageGroupName(), memCost));
}
}
/** called in each insert */
public void addTSPMemCost(long cost) {
memCost += cost;
- storageGroupInfo.addStorageGroupMemCost(cost);
+ dataRegionInfo.addStorageGroupMemCost(cost);
}
/** called when meet exception */
public void releaseTSPMemCost(long cost) {
- storageGroupInfo.releaseStorageGroupMemCost(cost);
+ dataRegionInfo.releaseStorageGroupMemCost(cost);
memCost -= cost;
}
/** called when closing TSP */
public void clear() {
- storageGroupInfo.releaseStorageGroupMemCost(memCost);
+ dataRegionInfo.releaseStorageGroupMemCost(memCost);
memCost = 0L;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 293f9d635a..710a523ec3 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -20,11 +20,12 @@
package org.apache.iotdb.db.rescon;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.flush.FlushManager;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
+import org.apache.iotdb.db.engine.storagegroup.DataRegionInfo;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
@@ -47,14 +48,13 @@ public class SystemInfo {
private long memorySizeForWrite;
private long memorySizeForCompaction;
-
- private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new HashMap<>();
+ private Map<DataRegionInfo, Long> reportedStorageGroupMemCostMap = new HashMap<>();
private long flushingMemTablesCost = 0L;
private AtomicLong compactionMemoryCost = new AtomicLong(0L);
private ExecutorService flushTaskSubmitThreadPool =
- IoTDBThreadPoolFactory.newSingleThreadExecutor("FlushTask-Submit-Pool");
+ IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.FLUSH_TASK_SUBMIT.getName());
private double FLUSH_THERSHOLD = memorySizeForWrite * config.getFlushProportion();
private double REJECT_THERSHOLD = memorySizeForWrite * config.getRejectProportion();
@@ -68,15 +68,15 @@ public class SystemInfo {
* Report current mem cost of storage group to system. Called when the memory of storage group
* newly accumulates to IoTDBConfig.getStorageGroupSizeReportThreshold()
*
- * @param storageGroupInfo storage group
+ * @param dataRegionInfo storage group
* @throws WriteProcessRejectException
*/
public synchronized boolean reportStorageGroupStatus(
- StorageGroupInfo storageGroupInfo, TsFileProcessor tsFileProcessor)
+ DataRegionInfo dataRegionInfo, TsFileProcessor tsFileProcessor)
throws WriteProcessRejectException {
long delta =
- storageGroupInfo.getMemCost()
- - reportedStorageGroupMemCostMap.getOrDefault(storageGroupInfo, 0L);
+ dataRegionInfo.getMemCost()
+ - reportedStorageGroupMemCostMap.getOrDefault(dataRegionInfo, 0L);
totalStorageGroupMemCost += delta;
if (logger.isDebugEnabled()) {
logger.debug(
@@ -85,8 +85,8 @@ public class SystemInfo {
delta,
totalStorageGroupMemCost);
}
- reportedStorageGroupMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
- storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
+ reportedStorageGroupMemCostMap.put(dataRegionInfo, dataRegionInfo.getMemCost());
+ dataRegionInfo.setLastReportedSize(dataRegionInfo.getMemCost());
if (totalStorageGroupMemCost < FLUSH_THERSHOLD) {
return true;
} else if (totalStorageGroupMemCost >= FLUSH_THERSHOLD
@@ -100,7 +100,7 @@ public class SystemInfo {
} else {
logger.info(
"Change system to reject status. Triggered by: logical SG ({}), mem cost delta ({}), totalSgMemCost ({}).",
- storageGroupInfo.getDataRegion().getStorageGroupName(),
+ dataRegionInfo.getDataRegion().getStorageGroupName(),
delta,
totalStorageGroupMemCost);
rejected = true;
@@ -124,29 +124,29 @@ public class SystemInfo {
* Report resetting the mem cost of sg to system. It will be called after flushing, closing and
* failed to insert
*
- * @param storageGroupInfo storage group
+ * @param dataRegionInfo storage group
*/
- public synchronized void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+ public synchronized void resetStorageGroupStatus(DataRegionInfo dataRegionInfo) {
long delta = 0;
- if (reportedStorageGroupMemCostMap.containsKey(storageGroupInfo)) {
- delta = reportedStorageGroupMemCostMap.get(storageGroupInfo) - storageGroupInfo.getMemCost();
+ if (reportedStorageGroupMemCostMap.containsKey(dataRegionInfo)) {
+ delta = reportedStorageGroupMemCostMap.get(dataRegionInfo) - dataRegionInfo.getMemCost();
this.totalStorageGroupMemCost -= delta;
- storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
- reportedStorageGroupMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
+ dataRegionInfo.setLastReportedSize(dataRegionInfo.getMemCost());
+ reportedStorageGroupMemCostMap.put(dataRegionInfo, dataRegionInfo.getMemCost());
}
if (totalStorageGroupMemCost >= FLUSH_THERSHOLD
&& totalStorageGroupMemCost < REJECT_THERSHOLD) {
logger.debug(
"SG ({}) released memory (delta: {}) but still exceeding flush proportion (totalSgMemCost: {}), call flush.",
- storageGroupInfo.getDataRegion().getStorageGroupName(),
+ dataRegionInfo.getDataRegion().getStorageGroupName(),
delta,
totalStorageGroupMemCost);
if (rejected) {
logger.info(
"SG ({}) released memory (delta: {}), set system to normal status (totalSgMemCost: {}).",
- storageGroupInfo.getDataRegion().getStorageGroupName(),
+ dataRegionInfo.getDataRegion().getStorageGroupName(),
delta,
totalStorageGroupMemCost);
}
@@ -155,7 +155,7 @@ public class SystemInfo {
} else if (totalStorageGroupMemCost >= REJECT_THERSHOLD) {
logger.warn(
"SG ({}) released memory (delta: {}), but system is still in reject status (totalSgMemCost: {}).",
- storageGroupInfo.getDataRegion().getStorageGroupName(),
+ dataRegionInfo.getDataRegion().getStorageGroupName(),
delta,
totalStorageGroupMemCost);
logCurrentTotalSGMemory();
@@ -163,7 +163,7 @@ public class SystemInfo {
} else {
logger.debug(
"SG ({}) released memory (delta: {}), system is in normal status (totalSgMemCost: {}).",
- storageGroupInfo.getDataRegion().getStorageGroupName(),
+ dataRegionInfo.getDataRegion().getStorageGroupName(),
delta,
totalStorageGroupMemCost);
logCurrentTotalSGMemory();
@@ -225,8 +225,8 @@ public class SystemInfo {
PriorityQueue<TsFileProcessor> allTsFileProcessors =
new PriorityQueue<>(
(o1, o2) -> Long.compare(o2.getWorkMemTableRamCost(), o1.getWorkMemTableRamCost()));
- for (StorageGroupInfo storageGroupInfo : reportedStorageGroupMemCostMap.keySet()) {
- allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp());
+ for (DataRegionInfo dataRegionInfo : reportedStorageGroupMemCostMap.keySet()) {
+ allTsFileProcessors.addAll(dataRegionInfo.getAllReportedTsp());
}
boolean isCurrentTsFileProcessorSelected = false;
long memCost = 0;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index dbcb619d75..73c9010fd2 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -72,7 +72,7 @@ public class TsFileProcessorTest {
private TsFileProcessor processor;
private String storageGroup = "root.vehicle";
private final String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info");
- private StorageGroupInfo sgInfo;
+ private DataRegionInfo sgInfo;
private String filePath = TestConstant.getTestTsFilePath("root.vehicle", 0, 0, 0);
private String deviceId = "root.vehicle.d0";
private String measurementId = "s0";
@@ -89,7 +89,7 @@ public class TsFileProcessorTest {
Assert.assertTrue(file.getParentFile().mkdirs());
}
EnvironmentUtils.envSetUp();
- sgInfo = new StorageGroupInfo(new DataRegionTest.DummyDataRegion(systemDir, storageGroup));
+ sgInfo = new DataRegionInfo(new DataRegionTest.DummyDataRegion(systemDir, storageGroup));
MetadataManagerHelper.initMetadata();
context = EnvironmentUtils.TEST_QUERY_CONTEXT;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
index 6a1cf93a46..6b006f8966 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
@@ -70,7 +70,7 @@ public class TsFileProcessorV2Test {
private TsFileProcessor processor;
private final String storageGroup = "root.vehicle";
- private StorageGroupInfo sgInfo;
+ private DataRegionInfo sgInfo;
private final String filePath = TestConstant.getTestTsFilePath("root.vehicle", 0, 0, 0);
private final String deviceId = "root.vehicle.d0";
private final String measurementId = "s0";
@@ -90,7 +90,7 @@ public class TsFileProcessorV2Test {
Assert.assertTrue(file.getParentFile().mkdirs());
}
EnvironmentUtils.envSetUp();
- sgInfo = new StorageGroupInfo(new DataRegionTest.DummyDataRegion(systemDir, storageGroup));
+ sgInfo = new DataRegionInfo(new DataRegionTest.DummyDataRegion(systemDir, storageGroup));
MetadataManagerHelper.initMetadata();
context = EnvironmentUtils.TEST_QUERY_CONTEXT;
}