You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/11/06 15:18:35 UTC

[iotdb] branch master updated: Fix mem control step 2 (#1970)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 206ae5d  Fix mem control step 2 (#1970)
206ae5d is described below

commit 206ae5d1a32a95857dd61b6a0af622a48a552958
Author: Haonan <hh...@outlook.com>
AuthorDate: Fri Nov 6 23:18:23 2020 +0800

    Fix mem control step 2 (#1970)
---
 .../db/engine/storagegroup/StorageGroupInfo.java   | 19 ++++++------
 .../engine/storagegroup/StorageGroupProcessor.java |  9 +++---
 .../db/engine/storagegroup/TsFileProcessor.java    |  9 +++++-
 .../engine/storagegroup/TsFileProcessorInfo.java   |  4 ++-
 .../org/apache/iotdb/db/rescon/SystemInfo.java     | 34 +++++++++++++---------
 5 files changed, 46 insertions(+), 29 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index 57de6eb..a31d41a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -18,8 +18,8 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -33,7 +33,8 @@ public class StorageGroupInfo {
   private StorageGroupProcessor storageGroupProcessor;
 
   /**
-   * The total Storage group memory cost
+   * The total Storage group memory cost,
+   * including unsealed TsFileResource, ChunkMetadata, WAL, primitive arrays and TEXT values
    */
   private AtomicLong memoryCost;
 
@@ -43,12 +44,12 @@ public class StorageGroupInfo {
   private long storageGroupSizeReportThreshold = 
       IoTDBDescriptor.getInstance().getConfig().getStorageGroupSizeReportThreshold();
 
-  private long lastReportedSize = 0L;
+  private AtomicLong lastReportedSize = new AtomicLong();
 
   /**
    * A set of all unclosed TsFileProcessors in this SG
    */
-  private Set<TsFileProcessor> reportedTsps = new HashSet<>();
+  private List<TsFileProcessor> reportedTsps = new CopyOnWriteArrayList<>();
 
   public StorageGroupInfo(StorageGroupProcessor storageGroupProcessor) {
     this.storageGroupProcessor = storageGroupProcessor;
@@ -76,20 +77,20 @@ public class StorageGroupInfo {
     memoryCost.getAndAdd(-cost);
   }
 
-  public long getSgMemCost() {
+  public long getMemCost() {
     return memoryCost.get();
   }
 
-  public Set<TsFileProcessor> getAllReportedTsp() {
+  public List<TsFileProcessor> getAllReportedTsp() {
     return reportedTsps;
   }
 
   public boolean needToReportToSystem() {
-    return memoryCost.get() - lastReportedSize > storageGroupSizeReportThreshold;
+    return memoryCost.get() - lastReportedSize.get() > storageGroupSizeReportThreshold;
   }
 
   public void setLastReportedSize(long size) {
-    lastReportedSize = size;
+    lastReportedSize.set(size);
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 17d09d6..2b49583 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -914,13 +914,12 @@ public class StorageGroupProcessor {
   }
 
   public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor tsFileProcessor) {
-    if (closingSequenceTsFileProcessor.contains(tsFileProcessor) || 
-        closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
-      return;
-    }
     writeLock();
     try {
-      fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
+      if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) && 
+          !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+        fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
+      }
     } finally {
       writeUnlock();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 5e5f083..b810774 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -421,6 +421,8 @@ public class TsFileProcessor {
       return false;
     }
     if (shouldFlush) {
+      logger.info("The memtable size {} of tsfile {} reaches the mem control threshold",
+          workMemTable.memSize(), tsFileResource.getTsFile().getAbsolutePath());
       return true;
     }
     if (!enableMemControl && workMemTable.memSize() >= getMemtableSizeThresholdBasedOnSeriesNum()) {
@@ -676,8 +678,13 @@ public class TsFileProcessor {
       }
       memTable.release();
       if (enableMemControl) {
-        // For text type data, reset the mem cost in tsFileProcessorInfo
+        // reset the mem cost in StorageGroupProcessorInfo
         storageGroupInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
+        if (logger.isDebugEnabled()) {
+          logger.debug("[mem control] {}: {} flush finished, try to reset system memcost, "
+              + "flushing memtable list size: {}", storageGroupName,
+          tsFileResource.getTsFile().getName(), flushingMemTables.size());
+        }
         // report to System
         SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, true);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java
index 52bc863..417168e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java
@@ -30,7 +30,9 @@ public class TsFileProcessorInfo {
    */
   private StorageGroupInfo storageGroupInfo;
 
-  // unsealed TsFileResource, ChunkMetadata, WAL
+  /**
+   * memory occupation of unsealed TsFileResource, ChunkMetadata, WAL
+   */
   private long memCost;
 
 
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index fa1715e..74c00f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -54,17 +54,19 @@ public class SystemInfo {
    * @param storageGroupInfo storage group
    */
   public void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) {
-    long delta = storageGroupInfo.getSgMemCost() -
+    long delta = storageGroupInfo.getMemCost() -
         reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
     totalSgMemCost.addAndGet(delta);
-    logger.debug("Report Storage Group Status to the system. "
+    if (logger.isDebugEnabled()) {
+      logger.debug("Report Storage Group Status to the system. "
           + "After adding {}, current sg mem cost is {}.", delta, totalSgMemCost);
-    reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
-    storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
+    }
+    reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
+    storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
     if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) {
       logger.debug("The total storage group mem costs are too large, call for flushing. "
           + "Current sg cost is {}", totalSgMemCost);
-      flush();
+      chooseTSPToMarkFlush();
     }
     if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) {
       logger.info("Change system to reject status...");
@@ -81,10 +83,10 @@ public class SystemInfo {
   public void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo,
       boolean shouldInvokeFlush) {
     if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
-      this.totalSgMemCost.addAndGet(storageGroupInfo.getSgMemCost() -
+      this.totalSgMemCost.addAndGet(storageGroupInfo.getMemCost() -
           reportedSgMemCostMap.get(storageGroupInfo));
-      storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost());
-      reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost());
+      storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
+      reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
       if (shouldInvokeFlush) {
         checkSystemToInvokeFlush();
       }
@@ -120,11 +122,11 @@ public class SystemInfo {
   }
 
   /**
-   * Flush the tsfileProcessor in SG with the max mem cost. If the queue size of flushing >
-   * threshold, it's identified as flushing is in progress.
+   * Order all tsfileProcessors in system by memory cost of actual data points in memtable.
+   * Mark the top K TSPs as to be flushed,
+   * so that after flushing the K TSPs, the memory cost should be less than FLUSH_THRESHOLD
    */
-  public void flush() {
-
+  private void chooseTSPToMarkFlush() {
     if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) {
       return;
     }
@@ -144,8 +146,14 @@ public class SystemInfo {
   /**
    * Be Careful!! This method can only be called by flush thread!
    */
-  public void forceAsyncFlush() {
+  private void forceAsyncFlush() {
+    if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) {
+      return;
+    }
     List<TsFileProcessor> processors = getTsFileProcessorsToFlush();
+    if (logger.isDebugEnabled()) {
+      logger.debug("[mem control] get {} tsp to flush", processors.size());
+    }
     for (TsFileProcessor processor : processors) {
       if (processor != null) {
         processor.startAsyncFlush();