You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/07/07 11:26:43 UTC

[iotdb] 01/02: finish MemoryController

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 86a6f3c750b72a1864151ed7c79e13cad728eab4
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Thu Jul 7 19:16:34 2022 +0800

    finish MemoryController
---
 .../apache/iotdb/db/rescon/MemoryController.java   | 113 +++++++++++++++++++--
 1 file changed, 102 insertions(+), 11 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/MemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/MemoryController.java
index 098c293381..aaf55d41b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/MemoryController.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/MemoryController.java
@@ -18,26 +18,117 @@
  */
 package org.apache.iotdb.db.rescon;
 
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class MemoryController {
   private static Logger log = LoggerFactory.getLogger(MemoryController.class);
-  private static final MemoryController INSTANCE = new MemoryController();
-  private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private long memorySizeForWrite = config.getAllocateMemoryForWrite();
-  private long FLUSH_THERSHOLD = (long) (memorySizeForWrite * config.getFlushProportion());
-  private double REJECT_THERSHOLD = (long) (memorySizeForWrite * config.getRejectProportion());
   private AtomicLong memoryUsage = new AtomicLong(0);
+  private AtomicBoolean triggerRunning = new AtomicBoolean();
+  private long triggerThreshold = -1;
+  private long limitSize = -1;
+  private ReentrantLock lock = new ReentrantLock(false);
+  private Condition condition = lock.newCondition();
+  private Runnable trigger = null;
+
+  public MemoryController(long limitSize) {
+    this.limitSize = limitSize;
+  }
+
+  public MemoryController(long limitSize, long triggerThreshold, Runnable trigger) {
+    this.limitSize = limitSize;
+    this.triggerThreshold = triggerThreshold;
+    this.trigger = trigger;
+  }
+
+  public boolean tryAllocateMemory(long size) {
+    while (true) {
+      long current = memoryUsage.get();
+      long newUsage = current + size;
+
+      // We allow one request to go over the limit, to make the notification
+      // path simpler and more efficient
+      if (current > limitSize && limitSize > 0) {
+        return false;
+      }
+
+      if (memoryUsage.compareAndSet(current, newUsage)) {
+        checkTrigger(current, newUsage);
+        return true;
+      }
+    }
+  }
+
+  public void allocateMemoryMayBlock(long size) throws InterruptedException {
+    if (!tryAllocateMemory(size)) {
+      lock.lock();
+      try {
+        while (!tryAllocateMemory(size)) {
+          condition.await();
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  public boolean allocateMemoryMayBlock(long size, long maxBlockTime) throws InterruptedException {
+    long startTime = System.currentTimeMillis();
+    if (!tryAllocateMemory(size)) {
+      lock.lock();
+      try {
+        while (tryAllocateMemory(size)) {
+          if (System.currentTimeMillis() - startTime >= maxBlockTime) {
+            return false;
+          }
+          condition.await();
+        }
+        return true;
+      } finally {
+        lock.unlock();
+      }
+    }
+    return true;
+  }
+
+  public void releaseMemory(long size) {
+    long newUsage = memoryUsage.addAndGet(-size);
+    if (newUsage + size > limitSize && newUsage <= limitSize) {
+      lock.lock();
+      try {
+        condition.signalAll();
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
 
-  private MemoryController() {}
+  private void checkTrigger(long prevUsage, long newUsage) {
+    if (newUsage >= triggerThreshold && prevUsage < triggerThreshold && trigger != null) {
+      if (triggerRunning.compareAndSet(false, true)) {
+        try {
+          trigger.run();
+        } finally {
+          triggerRunning.set(false);
+        }
+      }
+    }
+  }
+
+  public long getCurrentMemoryUsage() {
+    return memoryUsage.get();
+  }
+
+  public double getCurrentUsagePercentage() {
+    return ((double) memoryUsage.get()) / ((double) limitSize);
+  }
 
-  private MemoryController getInstance() {
-    return INSTANCE;
+  public boolean isMemoryLimited() {
+    return this.limitSize > 0;
   }
 }