You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/11/05 12:47:43 UTC

[iotdb] branch fix_mem_control_bug11 created (now d489968)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch fix_mem_control_bug11
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at d489968  fix some mem_control bug

This branch includes the following new commits:

     new d489968  fix some mem_control bug

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: fix some mem_control bug

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch fix_mem_control_bug11
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d489968ec5648e3590e114f03de0cf8009459be5
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Nov 5 20:37:50 2020 +0800

    fix some mem_control bug
---
 .../db/engine/storagegroup/StorageGroupInfo.java   |  2 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 12 ++++
 .../db/engine/storagegroup/TsFileProcessor.java    | 12 ++--
 .../org/apache/iotdb/db/rescon/SystemInfo.java     | 84 ++++++++++++----------
 4 files changed, 65 insertions(+), 45 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index 4d60c52..57de6eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -100,6 +100,6 @@ public class StorageGroupInfo {
    */
   public void closeTsFileProcessorAndReportToSystem(TsFileProcessor tsFileProcessor) {
     reportedTsps.remove(tsFileProcessor);
-    SystemInfo.getInstance().resetStorageGroupStatus(this);
+    SystemInfo.getInstance().resetStorageGroupStatus(this, true);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 8673544..5a8cc28 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -913,6 +913,18 @@ public class StorageGroupProcessor {
     }
   }
 
+  public void flushATsFileProcessor(TsFileProcessor tsFileProcessor) {
+    writeLock();
+    try {
+      // check memtable size and may async try to flush the work memtable
+      if (tsFileProcessor.shouldFlush()) {
+        fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
+      }
+    } finally {
+      writeUnlock();
+    }
+  }
+
   private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean sequence) {
     TsFileProcessor tsFileProcessor = null;
     try {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index b74a42e..2e35cb5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -293,7 +293,7 @@ public class TsFileProcessor {
       } catch (WriteProcessException e) {
         storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
         tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
-        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false);
         throw e;
       }
     }
@@ -357,7 +357,7 @@ public class TsFileProcessor {
       } catch (WriteProcessException e) {
         storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
         tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement);
-        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false);
         throw e;
       }
     }
@@ -677,7 +677,7 @@ public class TsFileProcessor {
         // For text type data, reset the mem cost in tsFileProcessorInfo
         storageGroupInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
         // report to System
-        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, true);
       }
       if (logger.isDebugEnabled()) {
         logger.debug("{}: {} flush finished, remove a memtable from flushing list, "
@@ -972,10 +972,8 @@ public class TsFileProcessor {
     return sequence;
   }
 
-  public void startClose() {
-    storageGroupInfo.getStorageGroupProcessor().asyncCloseOneTsFileProcessor(sequence, this);
-    logger.info("Async close tsfile: {}",
-        getTsFileResource().getTsFile().getAbsolutePath());
+  public void startFlush() {
+    storageGroupInfo.getStorageGroupProcessor().flushATsFileProcessor(this);
   }
 
   public void setFlush() {
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index ab3e25a..aa186ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -20,10 +20,12 @@
 package org.apache.iotdb.db.rescon;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -37,10 +39,10 @@ public class SystemInfo {
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class);
 
-  private long totalSgMemCost;
+  private AtomicLong totalSgMemCost = new AtomicLong();
   private volatile boolean rejected = false;
 
-  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new ConcurrentHashMap<>();
 
   private static final double FLUSH_PROPORTION = config.getFlushProportion();
   private static final double REJECT_PROPORTION = config.getRejectProportion();
@@ -51,20 +53,20 @@ public class SystemInfo {
    *
    * @param storageGroupInfo storage group
    */
-  public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+  public void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
     long delta = storageGroupInfo.getSgMemCost() -
         reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
-    totalSgMemCost += delta;
+    totalSgMemCost.addAndGet(delta);
     logger.debug("Report Storage Group Status to the system. "
           + "After adding {}, current sg mem cost is {}.", delta, totalSgMemCost);
     reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
     storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
-    if (totalSgMemCost >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
+    if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
       logger.debug("The total storage group mem costs are too large, call for flushing. "
           + "Current sg cost is {}", totalSgMemCost);
       flush();
     }
-    if (totalSgMemCost >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+    if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
       logger.info("Change system to reject status...");
       rejected = true;
     }
@@ -72,32 +74,44 @@ public class SystemInfo {
 
   /**
    * Report resetting the mem cost of sg to system.
-   * It will be invoked after flushing, closing and failed to insert
+   * It will be called after flushing, closing and failed to insert
    *
    * @param storageGroupInfo storage group
    */
-  public synchronized void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
+  public void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo,
+      boolean shouldInvokeFlush) {
     if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
-      this.totalSgMemCost -= reportedSgMemCostMap.get(storageGroupInfo)
-          - storageGroupInfo.getSgMemCost();
+      this.totalSgMemCost.addAndGet(storageGroupInfo.getSgMemCost() -
+          reportedSgMemCostMap.get(storageGroupInfo));
       storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
-      if (totalSgMemCost > config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
-        logger.debug("Some sg memory released but still exceeding flush proportion, call flush.");
-        logCurrentTotalSGMemory();
-        forceFlush();
+      reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+      if (shouldInvokeFlush) {
+        checkSystemToInvokeFlush();
       }
-      if (totalSgMemCost < config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
-        if (rejected) {
-          logger.info("Some sg memory released, set system to normal status.");
-        }
-        logCurrentTotalSGMemory();
-        rejected = false;
-      } else {
-        logger.warn("Some sg memory released, but system is still in reject status.");
-        logCurrentTotalSGMemory();
-        rejected = true;
+    }
+  }
+
+  private void checkSystemToInvokeFlush() {
+    if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION &&
+        totalSgMemCost.get() < config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+      logger.debug("Some sg memory released but still exceeding flush proportion, call flush.");
+      if (rejected) {
+        logger.info("Some sg memory released, set system to normal status.");
       }
-      reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+      logCurrentTotalSGMemory();
+      rejected = false;
+      forceAsyncFlush();
+    }
+    else if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
+      logger.warn("Some sg memory released, but system is still in reject status.");
+      logCurrentTotalSGMemory();
+      rejected = true;
+      forceAsyncFlush();
+    } 
+    else {
+      logger.debug("Some sg memory released, system is in normal status.");
+      logCurrentTotalSGMemory();
+      rejected = false;
     }
   }
 
@@ -127,18 +141,14 @@ public class SystemInfo {
     }
   }
 
-  public void forceFlush() {
-    if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) {
-      return;
-    }
+  /**
+   * Be Careful!! This method can only be called by flush thread!
+   */
+  public void forceAsyncFlush() {
     List<TsFileProcessor> processors = getTsFileProcessorsToFlush();
     for (TsFileProcessor processor : processors) {
       if (processor != null) {
-        if (processor.shouldClose()) {
-          processor.startClose();
-        } else {
-          processor.asyncFlush();
-        }
+        processor.startFlush();
       }
     }
   }
@@ -151,7 +161,7 @@ public class SystemInfo {
     }
     List<TsFileProcessor> processors = new ArrayList<>();
     long memCost = 0;
-    while (totalSgMemCost - memCost > config.getAllocateMemoryForWrite() *
+    while (totalSgMemCost.get() - memCost > config.getAllocateMemoryForWrite() *
         FLUSH_PROPORTION / 2) {
       if (tsps.isEmpty() || tsps.peek().getWorkMemTableRamCost() == 0) {
         return processors;
@@ -169,7 +179,7 @@ public class SystemInfo {
 
   public void close() {
     reportedSgMemCostMap.clear();
-    totalSgMemCost = 0;
+    totalSgMemCost.set(0);
     rejected = false;
   }