You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/11/05 12:47:44 UTC
[iotdb] 01/01: fix some mem_control bug
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch fix_mem_control_bug11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d489968ec5648e3590e114f03de0cf8009459be5
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Nov 5 20:37:50 2020 +0800
fix some mem_control bug
---
.../db/engine/storagegroup/StorageGroupInfo.java | 2 +-
.../engine/storagegroup/StorageGroupProcessor.java | 12 ++++
.../db/engine/storagegroup/TsFileProcessor.java | 12 ++--
.../org/apache/iotdb/db/rescon/SystemInfo.java | 84 ++++++++++++----------
4 files changed, 65 insertions(+), 45 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index 4d60c52..57de6eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -100,6 +100,6 @@ public class StorageGroupInfo {
*/
public void closeTsFileProcessorAndReportToSystem(TsFileProcessor tsFileProcessor) {
reportedTsps.remove(tsFileProcessor);
- SystemInfo.getInstance().resetStorageGroupStatus(this);
+ SystemInfo.getInstance().resetStorageGroupStatus(this, true);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 8673544..5a8cc28 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -913,6 +913,18 @@ public class StorageGroupProcessor {
}
}
+ public void flushATsFileProcessor(TsFileProcessor tsFileProcessor) {
+ writeLock();
+ try {
+ // check memtable size and may async try to flush the work memtable
+ if (tsFileProcessor.shouldFlush()) {
+ fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean sequence) {
TsFileProcessor tsFileProcessor = null;
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index b74a42e..2e35cb5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -293,7 +293,7 @@ public class TsFileProcessor {
} catch (WriteProcessException e) {
storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
- SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+ SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false);
throw e;
}
}
@@ -357,7 +357,7 @@ public class TsFileProcessor {
} catch (WriteProcessException e) {
storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
- SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+ SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false);
throw e;
}
}
@@ -677,7 +677,7 @@ public class TsFileProcessor {
// For text type data, reset the mem cost in tsFileProcessorInfo
storageGroupInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
// report to System
- SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+ SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, true);
}
if (logger.isDebugEnabled()) {
logger.debug("{}: {} flush finished, remove a memtable from flushing list, "
@@ -972,10 +972,8 @@ public class TsFileProcessor {
return sequence;
}
- public void startClose() {
- storageGroupInfo.getStorageGroupProcessor().asyncCloseOneTsFileProcessor(sequence, this);
- logger.info("Async close tsfile: {}",
- getTsFileResource().getTsFile().getAbsolutePath());
+ public void startFlush() {
+ storageGroupInfo.getStorageGroupProcessor().flushATsFileProcessor(this);
}
public void setFlush() {
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index ab3e25a..aa186ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -20,10 +20,12 @@
package org.apache.iotdb.db.rescon;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -37,10 +39,10 @@ public class SystemInfo {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class);
- private long totalSgMemCost;
+ private AtomicLong totalSgMemCost = new AtomicLong();
private volatile boolean rejected = false;
- private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+ private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new ConcurrentHashMap<>();
private static final double FLUSH_PROPORTION = config.getFlushProportion();
private static final double REJECT_PROPORTION = config.getRejectProportion();
@@ -51,20 +53,20 @@ public class SystemInfo {
*
* @param storageGroupInfo storage group
*/
- public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+ public void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
long delta = storageGroupInfo.getSgMemCost() -
reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
- totalSgMemCost += delta;
+ totalSgMemCost.addAndGet(delta);
logger.debug("Report Storage Group Status to the system. "
+ "After adding {}, current sg mem cost is {}.", delta, totalSgMemCost);
reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
- if (totalSgMemCost >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+ if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
logger.debug("The total storage group mem costs are too large, call for flushing. "
+ "Current sg cost is {}", totalSgMemCost);
flush();
}
- if (totalSgMemCost >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+ if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
logger.info("Change system to reject status...");
rejected = true;
}
@@ -72,32 +74,44 @@ public class SystemInfo {
/**
* Report resetting the mem cost of sg to system.
- * It will be invoked after flushing, closing and failed to insert
+ * It will be called after flushing, closing and failed to insert
*
* @param storageGroupInfo storage group
*/
- public synchronized void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+ public void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo,
+ boolean shouldInvokeFlush) {
if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
- this.totalSgMemCost -= reportedSgMemCostMap.get(storageGroupInfo)
- - storageGroupInfo.getSgMemCost();
+ this.totalSgMemCost.addAndGet(storageGroupInfo.getSgMemCost() -
+ reportedSgMemCostMap.get(storageGroupInfo));
storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
- if (totalSgMemCost > config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
- logger.debug("Some sg memory released but still exceeding flush proportion, call flush.");
- logCurrentTotalSGMemory();
- forceFlush();
+ reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+ if (shouldInvokeFlush) {
+ checkSystemToInvokeFlush();
}
- if (totalSgMemCost < config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
- if (rejected) {
- logger.info("Some sg memory released, set system to normal status.");
- }
- logCurrentTotalSGMemory();
- rejected = false;
- } else {
- logger.warn("Some sg memory released, but system is still in reject status.");
- logCurrentTotalSGMemory();
- rejected = true;
+ }
+ }
+
+ private void checkSystemToInvokeFlush() {
+ if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION &&
+ totalSgMemCost.get() < config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+ logger.debug("Some sg memory released but still exceeding flush proportion, call flush.");
+ if (rejected) {
+ logger.info("Some sg memory released, set system to normal status.");
}
- reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+ logCurrentTotalSGMemory();
+ rejected = false;
+ forceAsyncFlush();
+ }
+ else if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+ logger.warn("Some sg memory released, but system is still in reject status.");
+ logCurrentTotalSGMemory();
+ rejected = true;
+ forceAsyncFlush();
+ }
+ else {
+ logger.debug("Some sg memory released, system is in normal status.");
+ logCurrentTotalSGMemory();
+ rejected = false;
}
}
@@ -127,18 +141,14 @@ public class SystemInfo {
}
}
- public void forceFlush() {
- if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) {
- return;
- }
+ /**
+ * Be Careful!! This method can only be called by flush thread!
+ */
+ public void forceAsyncFlush() {
List<TsFileProcessor> processors = getTsFileProcessorsToFlush();
for (TsFileProcessor processor : processors) {
if (processor != null) {
- if (processor.shouldClose()) {
- processor.startClose();
- } else {
- processor.asyncFlush();
- }
+ processor.startFlush();
}
}
}
@@ -151,7 +161,7 @@ public class SystemInfo {
}
List<TsFileProcessor> processors = new ArrayList<>();
long memCost = 0;
- while (totalSgMemCost - memCost > config.getAllocateMemoryForWrite() *
+ while (totalSgMemCost.get() - memCost > config.getAllocateMemoryForWrite() *
FLUSH_PROPORTION / 2) {
if (tsps.isEmpty() || tsps.peek().getWorkMemTableRamCost() == 0) {
return processors;
@@ -169,7 +179,7 @@ public class SystemInfo {
public void close() {
reportedSgMemCostMap.clear();
- totalSgMemCost = 0;
+ totalSgMemCost.set(0);
rejected = false;
}