You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/11/06 09:57:58 UTC
[iotdb] branch rel/0.11 updated: [To rel/0.11] Fix mem control step
2 (#1971)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 9f67eca [To rel/0.11] Fix mem control step 2 (#1971)
9f67eca is described below
commit 9f67eca5dab3fb658931f4d81349eccedd757c6b
Author: Haonan <hh...@outlook.com>
AuthorDate: Fri Nov 6 17:57:42 2020 +0800
[To rel/0.11] Fix mem control step 2 (#1971)
---
.../db/engine/storagegroup/StorageGroupInfo.java | 19 ++++++------
.../engine/storagegroup/StorageGroupProcessor.java | 9 +++---
.../db/engine/storagegroup/TsFileProcessor.java | 9 +++++-
.../engine/storagegroup/TsFileProcessorInfo.java | 4 ++-
.../org/apache/iotdb/db/rescon/SystemInfo.java | 34 +++++++++++++---------
5 files changed, 46 insertions(+), 29 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index 57de6eb..a31d41a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -18,8 +18,8 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -33,7 +33,8 @@ public class StorageGroupInfo {
private StorageGroupProcessor storageGroupProcessor;
/**
- * The total Storage group memory cost
+ * The total Storage group memory cost,
+ * including unsealed TsFileResource, ChunkMetadata, WAL, primitive arrays and TEXT values
*/
private AtomicLong memoryCost;
@@ -43,12 +44,12 @@ public class StorageGroupInfo {
private long storageGroupSizeReportThreshold =
IoTDBDescriptor.getInstance().getConfig().getStorageGroupSizeReportThreshold();
- private long lastReportedSize = 0L;
+ private AtomicLong lastReportedSize = new AtomicLong();
/**
* A set of all unclosed TsFileProcessors in this SG
*/
- private Set<TsFileProcessor> reportedTsps = new HashSet<>();
+ private List<TsFileProcessor> reportedTsps = new CopyOnWriteArrayList<>();
public StorageGroupInfo(StorageGroupProcessor storageGroupProcessor) {
this.storageGroupProcessor = storageGroupProcessor;
@@ -76,20 +77,20 @@ public class StorageGroupInfo {
memoryCost.getAndAdd(-cost);
}
- public long getSgMemCost() {
+ public long getMemCost() {
return memoryCost.get();
}
- public Set<TsFileProcessor> getAllReportedTsp() {
+ public List<TsFileProcessor> getAllReportedTsp() {
return reportedTsps;
}
public boolean needToReportToSystem() {
- return memoryCost.get() - lastReportedSize > storageGroupSizeReportThreshold;
+ return memoryCost.get() - lastReportedSize.get() > storageGroupSizeReportThreshold;
}
public void setLastReportedSize(long size) {
- lastReportedSize = size;
+ lastReportedSize.set(size);
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 17d09d6..2b49583 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -914,13 +914,12 @@ public class StorageGroupProcessor {
}
public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor tsFileProcessor) {
- if (closingSequenceTsFileProcessor.contains(tsFileProcessor) ||
- closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
- return;
- }
writeLock();
try {
- fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
+ if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) &&
+ !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+ fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
+ }
} finally {
writeUnlock();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 5e5f083..b810774 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -421,6 +421,8 @@ public class TsFileProcessor {
return false;
}
if (shouldFlush) {
+ logger.info("The memtable size {} of tsfile {} reaches the mem control threshold",
+ workMemTable.memSize(), tsFileResource.getTsFile().getAbsolutePath());
return true;
}
if (!enableMemControl && workMemTable.memSize() >= getMemtableSizeThresholdBasedOnSeriesNum()) {
@@ -676,8 +678,13 @@ public class TsFileProcessor {
}
memTable.release();
if (enableMemControl) {
- // For text type data, reset the mem cost in tsFileProcessorInfo
+ // reset the mem cost in StorageGroupProcessorInfo
storageGroupInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
+ if (logger.isDebugEnabled()) {
+ logger.debug("[mem control] {}: {} flush finished, try to reset system memcost, "
+ + "flushing memtable list size: {}", storageGroupName,
+ tsFileResource.getTsFile().getName(), flushingMemTables.size());
+ }
// report to System
SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, true);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java
index 52bc863..417168e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java
@@ -30,7 +30,9 @@ public class TsFileProcessorInfo {
*/
private StorageGroupInfo storageGroupInfo;
- // unsealed TsFileResource, ChunkMetadata, WAL
+ /**
+ * memory occupation of unsealed TsFileResource, ChunkMetadata, WAL
+ */
private long memCost;
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index fa1715e..74c00f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -54,17 +54,19 @@ public class SystemInfo {
* @param storageGroupInfo storage group
*/
public void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
- long delta = storageGroupInfo.getSgMemCost() -
+ long delta = storageGroupInfo.getMemCost() -
reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
totalSgMemCost.addAndGet(delta);
- logger.debug("Report Storage Group Status to the system. "
+ if (logger.isDebugEnabled()) {
+ logger.debug("Report Storage Group Status to the system. "
+ "After adding {}, current sg mem cost is {}.", delta, totalSgMemCost);
- reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
- storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+ }
+ reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
+ storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
logger.debug("The total storage group mem costs are too large, call for flushing. "
+ "Current sg cost is {}", totalSgMemCost);
- flush();
+ chooseTSPToMarkFlush();
}
if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
logger.info("Change system to reject status...");
@@ -81,10 +83,10 @@ public class SystemInfo {
public void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo,
boolean shouldInvokeFlush) {
if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
- this.totalSgMemCost.addAndGet(storageGroupInfo.getSgMemCost() -
+ this.totalSgMemCost.addAndGet(storageGroupInfo.getMemCost() -
reportedSgMemCostMap.get(storageGroupInfo));
- storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
- reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+ storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
+ reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
if (shouldInvokeFlush) {
checkSystemToInvokeFlush();
}
@@ -120,11 +122,11 @@ public class SystemInfo {
}
/**
- * Flush the tsfileProcessor in SG with the max mem cost. If the queue size of flushing >
- * threshold, it's identified as flushing is in progress.
+ * Order all tsfileProcessors in system by memory cost of actual data points in memtable.
+ * Mark the top K TSPs as to be flushed,
+ * so that after flushing the K TSPs, the memory cost should be less than FLUSH_THRESHOLD
*/
- public void flush() {
-
+ private void chooseTSPToMarkFlush() {
if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) {
return;
}
@@ -144,8 +146,14 @@ public class SystemInfo {
/**
* Be Careful!! This method can only be called by flush thread!
*/
- public void forceAsyncFlush() {
+ private void forceAsyncFlush() {
+ if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) {
+ return;
+ }
List<TsFileProcessor> processors = getTsFileProcessorsToFlush();
+ if (logger.isDebugEnabled()) {
+ logger.debug("[mem control] get {} tsp to flush", processors.size());
+ }
for (TsFileProcessor processor : processors) {
if (processor != null) {
processor.startAsyncFlush();