You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/11/05 12:47:44 UTC

[iotdb] 01/01: fix some mem_control bug

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch fix_mem_control_bug11
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d489968ec5648e3590e114f03de0cf8009459be5
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Nov 5 20:37:50 2020 +0800

    fix some mem_control bug
---
 .../db/engine/storagegroup/StorageGroupInfo.java   |  2 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 12 ++++
 .../db/engine/storagegroup/TsFileProcessor.java    | 12 ++--
 .../org/apache/iotdb/db/rescon/SystemInfo.java     | 84 ++++++++++++----------
 4 files changed, 65 insertions(+), 45 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index 4d60c52..57de6eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -100,6 +100,6 @@ public class StorageGroupInfo {
    */
   public void closeTsFileProcessorAndReportToSystem(TsFileProcessor tsFileProcessor) {
     reportedTsps.remove(tsFileProcessor);
-    SystemInfo.getInstance().resetStorageGroupStatus(this);
+    SystemInfo.getInstance().resetStorageGroupStatus(this, true);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 8673544..5a8cc28 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -913,6 +913,18 @@ public class StorageGroupProcessor {
     }
   }
 
+  public void flushATsFileProcessor(TsFileProcessor tsFileProcessor) {
+    writeLock();
+    try {
+      // check memtable size and may async try to flush the work memtable
+      if (tsFileProcessor.shouldFlush()) {
+        fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
+      }
+    } finally {
+      writeUnlock();
+    }
+  }
+
   private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean sequence) {
     TsFileProcessor tsFileProcessor = null;
     try {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index b74a42e..2e35cb5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -293,7 +293,7 @@ public class TsFileProcessor {
       } catch (WriteProcessException e) {
         storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
         tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
-        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false);
         throw e;
       }
     }
@@ -357,7 +357,7 @@ public class TsFileProcessor {
       } catch (WriteProcessException e) {
         storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
         tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
-        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false);
         throw e;
       }
     }
@@ -677,7 +677,7 @@ public class TsFileProcessor {
         // For text type data, reset the mem cost in tsFileProcessorInfo
         storageGroupInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
         // report to System
-        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, true);
       }
       if (logger.isDebugEnabled()) {
         logger.debug("{}: {} flush finished, remove a memtable from flushing list, "
@@ -972,10 +972,8 @@ public class TsFileProcessor {
     return sequence;
   }
 
-  public void startClose() {
-    storageGroupInfo.getStorageGroupProcessor().asyncCloseOneTsFileProcessor(sequence, this);
-    logger.info("Async close tsfile: {}",
-        getTsFileResource().getTsFile().getAbsolutePath());
+  public void startFlush() {
+    storageGroupInfo.getStorageGroupProcessor().flushATsFileProcessor(this);
   }
 
   public void setFlush() {
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index ab3e25a..aa186ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -20,10 +20,12 @@
 package org.apache.iotdb.db.rescon;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -37,10 +39,10 @@ public class SystemInfo {
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class);
 
-  private long totalSgMemCost;
+  private AtomicLong totalSgMemCost = new AtomicLong();
   private volatile boolean rejected = false;
 
-  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new ConcurrentHashMap<>();
 
   private static final double FLUSH_PROPORTION = config.getFlushProportion();
   private static final double REJECT_PROPORTION = config.getRejectProportion();
@@ -51,20 +53,20 @@ public class SystemInfo {
    *
    * @param storageGroupInfo storage group
    */
-  public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+  public void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
     long delta = storageGroupInfo.getSgMemCost() -
         reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
-    totalSgMemCost += delta;
+    totalSgMemCost.addAndGet(delta);
     logger.debug("Report Storage Group Status to the system. "
           + "After adding {}, current sg mem cost is {}.", delta, totalSgMemCost);
     reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
     storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
-    if (totalSgMemCost >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+    if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
       logger.debug("The total storage group mem costs are too large, call for flushing. "
           + "Current sg cost is {}", totalSgMemCost);
       flush();
     }
-    if (totalSgMemCost >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+    if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
       logger.info("Change system to reject status...");
       rejected = true;
     }
@@ -72,32 +74,44 @@ public class SystemInfo {
 
   /**
    * Report resetting the mem cost of sg to system.
-   * It will be invoked after flushing, closing and failed to insert
+   * It will be called after flushing, closing and failed to insert
    *
    * @param storageGroupInfo storage group
    */
-  public synchronized void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+  public void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo,
+      boolean shouldInvokeFlush) {
     if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
-      this.totalSgMemCost -= reportedSgMemCostMap.get(storageGroupInfo)
-          - storageGroupInfo.getSgMemCost();
+      this.totalSgMemCost.addAndGet(storageGroupInfo.getSgMemCost() -
+          reportedSgMemCostMap.get(storageGroupInfo));
       storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
-      if (totalSgMemCost > config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
-        logger.debug("Some sg memory released but still exceeding flush proportion, call flush.");
-        logCurrentTotalSGMemory();
-        forceFlush();
+      reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+      if (shouldInvokeFlush) {
+        checkSystemToInvokeFlush();
       }
-      if (totalSgMemCost < config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
-        if (rejected) {
-          logger.info("Some sg memory released, set system to normal status.");
-        }
-        logCurrentTotalSGMemory();
-        rejected = false;
-      } else {
-        logger.warn("Some sg memory released, but system is still in reject status.");
-        logCurrentTotalSGMemory();
-        rejected = true;
+    }
+  }
+
+  private void checkSystemToInvokeFlush() {
+    if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION &&
+        totalSgMemCost.get() < config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+      logger.debug("Some sg memory released but still exceeding flush proportion, call flush.");
+      if (rejected) {
+        logger.info("Some sg memory released, set system to normal status.");
       }
-      reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+      logCurrentTotalSGMemory();
+      rejected = false;
+      forceAsyncFlush();
+    }
+    else if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+      logger.warn("Some sg memory released, but system is still in reject status.");
+      logCurrentTotalSGMemory();
+      rejected = true;
+      forceAsyncFlush();
+    } 
+    else {
+      logger.debug("Some sg memory released, system is in normal status.");
+      logCurrentTotalSGMemory();
+      rejected = false;
     }
   }
 
@@ -127,18 +141,14 @@ public class SystemInfo {
     }
   }
 
-  public void forceFlush() {
-    if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) {
-      return;
-    }
+  /**
+   * Be Careful!! This method can only be called by flush thread!
+   */
+  public void forceAsyncFlush() {
     List<TsFileProcessor> processors = getTsFileProcessorsToFlush();
     for (TsFileProcessor processor : processors) {
       if (processor != null) {
-        if (processor.shouldClose()) {
-          processor.startClose();
-        } else {
-          processor.asyncFlush();
-        }
+        processor.startFlush();
       }
     }
   }
@@ -151,7 +161,7 @@ public class SystemInfo {
     }
     List<TsFileProcessor> processors = new ArrayList<>();
     long memCost = 0;
-    while (totalSgMemCost - memCost > config.getAllocateMemoryForWrite() *
+    while (totalSgMemCost.get() - memCost > config.getAllocateMemoryForWrite() *
         FLUSH_PROPORTION / 2) {
       if (tsps.isEmpty() || tsps.peek().getWorkMemTableRamCost() == 0) {
         return processors;
@@ -169,7 +179,7 @@ public class SystemInfo {
 
   public void close() {
     reportedSgMemCostMap.clear();
-    totalSgMemCost = 0;
+    totalSgMemCost.set(0);
     rejected = false;
   }