You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/11/06 09:30:10 UTC

[iotdb] branch fix_mem_control_2 updated: fix cannot close file

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch fix_mem_control_2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/fix_mem_control_2 by this push:
     new 9739a0f  fix cannot close file
9739a0f is described below

commit 9739a0f5251699d992bd9a748931146d6fc81ee6
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri Nov 6 17:29:14 2020 +0800

    fix cannot close file
---
 .../iotdb/db/engine/storagegroup/StorageGroupInfo.java    | 14 +++++++-------
 .../db/engine/storagegroup/StorageGroupProcessor.java     | 13 ++++++++-----
 .../iotdb/db/engine/storagegroup/TsFileProcessor.java     | 15 ++++++++++-----
 .../main/java/org/apache/iotdb/db/rescon/SystemInfo.java  |  6 ++++++
 4 files changed, 31 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index e696a60..a31d41a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -18,8 +18,8 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -44,12 +44,12 @@ public class StorageGroupInfo {
   private long storageGroupSizeReportThreshold = 
       IoTDBDescriptor.getInstance().getConfig().getStorageGroupSizeReportThreshold();
 
-  private long lastReportedSize = 0L;
+  private AtomicLong lastReportedSize = new AtomicLong();
 
   /**
    * A set of all unclosed TsFileProcessors in this SG
    */
-  private Set<TsFileProcessor> reportedTsps = new HashSet<>();
+  private List<TsFileProcessor> reportedTsps = new CopyOnWriteArrayList<>();
 
   public StorageGroupInfo(StorageGroupProcessor storageGroupProcessor) {
     this.storageGroupProcessor = storageGroupProcessor;
@@ -81,16 +81,16 @@ public class StorageGroupInfo {
     return memoryCost.get();
   }
 
-  public Set<TsFileProcessor> getAllReportedTsp() {
+  public List<TsFileProcessor> getAllReportedTsp() {
     return reportedTsps;
   }
 
   public boolean needToReportToSystem() {
-    return memoryCost.get() - lastReportedSize > storageGroupSizeReportThreshold;
+    return memoryCost.get() - lastReportedSize.get() > storageGroupSizeReportThreshold;
   }
 
   public void setLastReportedSize(long size) {
-    lastReportedSize = size;
+    lastReportedSize.set(size);
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 17d09d6..be56a73 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -829,6 +829,10 @@ public class StorageGroupProcessor {
           .put(insertTabletPlan.getDeviceId().getFullPath(), insertTabletPlan.getTimes()[end - 1]);
     }
 
+    if (closingSequenceTsFileProcessor.contains(tsFileProcessor) || 
+        closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+      return true;
+    }
     // check memtable size and may async try to flush the work memtable
     if (tsFileProcessor.shouldFlush()) {
       fileFlushPolicy.apply(this, tsFileProcessor, sequence);
@@ -914,13 +918,12 @@ public class StorageGroupProcessor {
   }
 
   public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor tsFileProcessor) {
-    if (closingSequenceTsFileProcessor.contains(tsFileProcessor) || 
-        closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
-      return;
-    }
     writeLock();
     try {
-      fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
+      if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) && 
+          !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+        fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
+      }
     } finally {
       writeUnlock();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 5e5f083..7ba02eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -493,15 +493,15 @@ public class TsFileProcessor {
     }
     try {
 
-      if (logger.isInfoEnabled()) {
+      if (logger.isDebugEnabled()) {
         if (workMemTable != null) {
-          logger.info(
+          logger.debug(
               "{}: flush a working memtable in async close tsfile {}, memtable size: {}, tsfile size: {}",
               storageGroupName, tsFileResource.getTsFile().getAbsolutePath(),
               workMemTable.memSize(),
               tsFileResource.getTsFileSize());
         } else {
-          logger.info("{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}",
+          logger.debug("{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}",
               storageGroupName, tsFileResource.getTsFile().getAbsolutePath(),
               tsFileResource.getTsFileSize());
         }
@@ -676,13 +676,18 @@ public class TsFileProcessor {
       }
       memTable.release();
       if (enableMemControl) {
-        // For text type data, reset the mem cost in tsFileProcessorInfo
+        // reset the mem cost in StorageGroupProcessorInfo
         storageGroupInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
+        if (logger.isDebugEnabled()) {
+          logger.debug("[mem control] {}: {} flush finished, try to reset system memcost, "
+              + "flushing memtable list size: {}", storageGroupName,
+          tsFileResource.getTsFile().getName(), flushingMemTables.size());
+        }
         // report to System
         SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, true);
       }
       if (logger.isDebugEnabled()) {
-        logger.debug("{}: {} flush finished, remove a memtable from flushing list, "
+        logger.debug("[mem_control] {}: {} flush finished, remove a memtable from flushing list, "
                 + "flushing memtable list size: {}", storageGroupName,
             tsFileResource.getTsFile().getName(), flushingMemTables.size());
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index aa32467..74c00f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -147,7 +147,13 @@ public class SystemInfo {
    * Be Careful!! This method can only be called by flush thread!
    */
   private void forceAsyncFlush() {
+    if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) {
+      return;
+    }
     List<TsFileProcessor> processors = getTsFileProcessorsToFlush();
+    if (logger.isDebugEnabled()) {
+      logger.debug("[mem control] get {} tsp to flush", processors.size());
+    }
     for (TsFileProcessor processor : processors) {
       if (processor != null) {
         processor.startAsyncFlush();