You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/03/20 06:45:31 UTC

[iotdb] branch master updated: [IOTDB-1274] fix the insert blocked caused the bugs in mem control module

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ffd095  [IOTDB-1274] fix the insert blocked caused the bugs in mem control module
6ffd095 is described below

commit 6ffd0951478fbd6a2c0bde8092c187a61ef50e0f
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri Mar 19 19:04:38 2021 +0800

    [IOTDB-1274] fix the insert blocked caused the bugs in mem control module
---
 .../org/apache/iotdb/db/rescon/SystemInfo.java     | 56 +++++++++++-----------
 1 file changed, 29 insertions(+), 27 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 8fb84f1..b6d587e 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -88,37 +88,39 @@ public class SystemInfo {
    *
    * @param storageGroupInfo storage group
    */
-  public synchronized void resetStorageGroupStatus(
+  public void resetStorageGroupStatus(
       StorageGroupInfo storageGroupInfo, boolean shouldInvokeFlush) {
-    if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
-      this.totalSgMemCost -=
-          (reportedSgMemCostMap.get(storageGroupInfo) - storageGroupInfo.getMemCost());
-      storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
-      reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
-      if (shouldInvokeFlush) {
-        checkSystemToInvokeFlush();
+    boolean needForceAsyncFlush = false;
+    synchronized (this) {
+      if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
+        this.totalSgMemCost -=
+            (reportedSgMemCostMap.get(storageGroupInfo) - storageGroupInfo.getMemCost());
+        storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
+        reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
       }
-    }
-  }
 
-  private void checkSystemToInvokeFlush() {
-    if (totalSgMemCost >= FLUSH_THERSHOLD && totalSgMemCost < REJECT_THERSHOLD) {
-      logger.debug("Some sg memory released but still exceeding flush proportion, call flush.");
-      if (rejected) {
-        logger.info("Some sg memory released, set system to normal status.");
+      if (totalSgMemCost >= FLUSH_THERSHOLD && totalSgMemCost < REJECT_THERSHOLD) {
+        logger.debug("Some sg memory released but still exceeding flush proportion, call flush.");
+        if (rejected) {
+          logger.info("Some sg memory released, set system to normal status.");
+        }
+        logCurrentTotalSGMemory();
+        rejected = false;
+        needForceAsyncFlush = true;
+      } else if (totalSgMemCost >= REJECT_THERSHOLD) {
+        logger.warn("Some sg memory released, but system is still in reject status.");
+        logCurrentTotalSGMemory();
+        rejected = true;
+        needForceAsyncFlush = true;
+
+      } else {
+        logger.debug("Some sg memory released, system is in normal status.");
+        logCurrentTotalSGMemory();
+        rejected = false;
       }
-      logCurrentTotalSGMemory();
-      rejected = false;
-      forceAsyncFlush();
-    } else if (totalSgMemCost >= REJECT_THERSHOLD) {
-      logger.warn("Some sg memory released, but system is still in reject status.");
-      logCurrentTotalSGMemory();
-      rejected = true;
+    }
+    if (shouldInvokeFlush && needForceAsyncFlush) {
       forceAsyncFlush();
-    } else {
-      logger.debug("Some sg memory released, system is in normal status.");
-      logCurrentTotalSGMemory();
-      rejected = false;
     }
   }
 
@@ -150,7 +152,7 @@ public class SystemInfo {
 
   /** Be Careful!! This method can only be called by flush thread! */
   private void forceAsyncFlush() {
-    if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) {
+    if (FlushManager.getInstance().getNumberOfWorkingTasks() > 1) {
       return;
     }
     List<TsFileProcessor> processors = getTsFileProcessorsToFlush();