You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/12/23 08:05:51 UTC

[iotdb] branch restrict_memtable_number created (now f62136f)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch restrict_memtable_number
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at f62136f  restrict flushing memtable number

This branch includes the following new commits:

     new f62136f  restrict flushing memtable number

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: restrict flushing memtable number

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch restrict_memtable_number
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f62136f3363db8469fdfce3e7348545186db58a1
Author: HTHou <hh...@outlook.com>
AuthorDate: Wed Dec 23 16:00:58 2020 +0800

    restrict flushing memtable number
---
 .../db/engine/storagegroup/TsFileProcessor.java    | 28 ++++++++++++++++++++--
 1 file changed, 26 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 68e8d86..a3cdcdb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -170,8 +170,9 @@ public class TsFileProcessor {
   public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException {
 
     if (workMemTable == null) {
-      workMemTable = new PrimitiveMemTable(enableMemControl);
+      workMemTable = getAvailableMemTable();
     }
+
     if (enableMemControl) {
       checkMemCostAndAddToTspInfo(insertRowPlan);
     }
@@ -212,7 +213,7 @@ public class TsFileProcessor {
       TSStatus[] results) throws WriteProcessException {
 
     if (workMemTable == null) {
-      workMemTable = new PrimitiveMemTable(enableMemControl);
+      workMemTable = getAvailableMemTable();
     }
 
     try {
@@ -1008,4 +1009,27 @@ public class TsFileProcessor {
   public void addCloseFileListeners(Collection<CloseFileListener> listeners) {
     closeFileListeners.addAll(listeners);
   }
+
+  private IMemTable getAvailableMemTable() {
+    synchronized (flushingMemTables) {
+      if (flushingMemTables.isEmpty()) {
+        return new PrimitiveMemTable(enableMemControl);
+      } else {
+        // wait until flushingMemTables is empty
+        int waitCount = 1;
+        while (true) {
+          if (flushingMemTables.isEmpty()) {
+            return new PrimitiveMemTable();
+          }
+          try {
+            flushingMemTables.wait(1000);
+          } catch (InterruptedException e) {
+            logger.error("{} fails to wait for memtables {}, continue to wait", tsFileResource.toString(), e);
+            Thread.currentThread().interrupt();
+          }
+          logger.info("{} has waited for a memtable for {}ms", tsFileResource.toString(), waitCount++ * 1000);
+        }
+      }
+    }
+  }
 }