You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/07/21 12:37:05 UTC

[iotdb] 03/03: finish for test

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3164-Allocate-By-Tablets
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2ad223946bb48fcad5589234591870e1aeb221a1
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Thu Jul 21 20:36:43 2022 +0800

    finish for test
---
 .../db/engine/storagegroup/TsFileProcessor.java    | 25 ++++++++++++++++----
 .../iotdb/db/rescon/memory/MemoryController.java   | 14 ++++++-----
 .../db/rescon/memory/WriteMemoryController.java    | 27 ++++++++++++++--------
 3 files changed, 45 insertions(+), 21 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 4d1e2891d1..c2b7ff861e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.adapter.CompressionRatio;
-import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.flush.CloseFileListener;
 import org.apache.iotdb.db.engine.flush.FlushListener;
 import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -85,6 +84,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -786,11 +786,26 @@ public class TsFileProcessor {
     tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement);
     WriteMemoryController controller = WriteMemoryController.getInstance();
     boolean allocateMemory = false;
+    long startTime = System.currentTimeMillis();
     try {
-      allocateMemory = controller.tryAllocateMemory(memTableIncrement, storageGroupInfo, this);
-      if (!allocateMemory) {
-        StorageEngine.blockInsertionIfReject(this);
-      }
+      do {
+        if (this.shouldFlush()) {
+          break;
+        }
+        allocateMemory = controller.tryAllocateMemory(memTableIncrement, storageGroupInfo, this);
+        try {
+          if (!allocateMemory) {
+            TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
+            if (System.currentTimeMillis() - startTime
+                > config.getMaxWaitingTimeWhenInsertBlocked()) {
+              throw new WriteProcessRejectException(
+                  "System rejected over " + (System.currentTimeMillis() - startTime) + "ms");
+            }
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      } while (!allocateMemory);
     } catch (WriteProcessRejectException e) {
       storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
       tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
index 80fe1063a7..cffe20e9f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
@@ -58,7 +58,7 @@ public class MemoryController<T> {
    * @param size
    * @return true if success to allocate else false
    */
-  public boolean tryAllocateMemory(long size, T triggerParam) {
+  public boolean tryAllocateMemory(long size, T triggerParam, boolean runTrigger) {
     while (true) {
       long current = memoryUsage.get();
       long newUsage = current + size;
@@ -70,7 +70,9 @@ public class MemoryController<T> {
       }
 
       if (memoryUsage.compareAndSet(current, newUsage)) {
-        checkTrigger(newUsage, triggerParam);
+        if (runTrigger) {
+          checkTrigger(newUsage, triggerParam);
+        }
         return true;
       }
     }
@@ -84,10 +86,10 @@ public class MemoryController<T> {
    * @throws InterruptedException
    */
   public void allocateMemoryMayBlock(long size, T triggerParam) throws InterruptedException {
-    if (!tryAllocateMemory(size, triggerParam)) {
+    if (!tryAllocateMemory(size, triggerParam, true)) {
       lock.lock();
       try {
-        while (!tryAllocateMemory(size, triggerParam)) {
+        while (!tryAllocateMemory(size, triggerParam, true)) {
           condition.await();
         }
       } finally {
@@ -107,10 +109,10 @@ public class MemoryController<T> {
   public boolean allocateMemoryMayBlock(long size, long timeout, T triggerParam)
       throws InterruptedException {
     long startTime = System.currentTimeMillis();
-    if (!tryAllocateMemory(size, triggerParam)) {
+    if (!tryAllocateMemory(size, triggerParam, true)) {
       lock.lock();
       try {
-        while (tryAllocateMemory(size, triggerParam)) {
+        while (tryAllocateMemory(size, triggerParam, true)) {
           if (System.currentTimeMillis() - startTime >= timeout) {
             return false;
           }
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
index 6fe2feb6e2..c2538bf59d 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -53,7 +53,14 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
   }
 
   public boolean tryAllocateMemory(long size, StorageGroupInfo info, TsFileProcessor processor) {
-    boolean success = super.tryAllocateMemory(size, processor);
+    boolean success = false;
+    if (size < REJECT_THRESHOLD) {
+      success = super.tryAllocateMemory(size, processor, true);
+    } else {
+      if (chooseMemtableToFlush(processor)) {
+        success = super.tryAllocateMemory(size, processor, false);
+      }
+    }
     if (memoryUsage.get() > REJECT_THRESHOLD && !rejected) {
       logger.info(
           "Change system to reject status. Triggered by: logical SG ({}), mem cost delta ({}), totalSgMemCost ({}).",
@@ -100,15 +107,15 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
     flushingMemory.addAndGet(size);
   }
 
-  protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) {
+  protected boolean chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) {
     // If invoke flush by replaying logs, do not flush now!
     if (infoSet.size() == 0) {
-      return;
+      return false;
     }
     long memCost = 0;
     long activeMemSize = memoryUsage.get() - flushingMemory.get();
     if (activeMemSize - memCost < FLUSH_THRESHOLD) {
-      return;
+      return false;
     }
     PriorityQueue<TsFileProcessor> allTsFileProcessors =
         new PriorityQueue<>(
@@ -117,10 +124,11 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
       allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp());
     }
     long selectedCount = 0;
+    boolean currentTsFileProcessorSelected = false;
     while (activeMemSize - memCost > FLUSH_THRESHOLD) {
       if (allTsFileProcessors.isEmpty()
           || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) {
-        return;
+        return false;
       }
       TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.poll();
       if (selectedTsFileProcessor == null) {
@@ -130,15 +138,14 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
           || selectedTsFileProcessor.getWorkMemTable().shouldFlush()) {
         continue;
       }
+      if (selectedTsFileProcessor == currentTsFileProcessor) {
+        currentTsFileProcessorSelected = true;
+      }
       memCost += selectedTsFileProcessor.getWorkMemTableRamCost();
       selectedTsFileProcessor.setWorkMemTableShouldFlush();
       flushTaskSubmitThreadPool.submit(selectedTsFileProcessor::submitAFlushTask);
       selectedCount++;
     }
-    logger.info(
-        "Select {} memtable to flush, flushing memory is {}, remaining memory is {}",
-        selectedCount,
-        flushingMemory.get(),
-        memoryUsage.get() - flushingMemory.get());
+    return currentTsFileProcessorSelected;
   }
 }