You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/07/07 11:26:43 UTC
[iotdb] 01/02: finish MemoryController
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 86a6f3c750b72a1864151ed7c79e13cad728eab4
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Thu Jul 7 19:16:34 2022 +0800
finish MemoryController
---
.../apache/iotdb/db/rescon/MemoryController.java | 113 +++++++++++++++++++--
1 file changed, 102 insertions(+), 11 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/MemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/MemoryController.java
index 098c293381..aaf55d41b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/MemoryController.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/MemoryController.java
@@ -18,26 +18,117 @@
*/
package org.apache.iotdb.db.rescon;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
public class MemoryController {
private static Logger log = LoggerFactory.getLogger(MemoryController.class);
- private static final MemoryController INSTANCE = new MemoryController();
- private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private long memorySizeForWrite = config.getAllocateMemoryForWrite();
- private long FLUSH_THERSHOLD = (long) (memorySizeForWrite * config.getFlushProportion());
- private double REJECT_THERSHOLD = (long) (memorySizeForWrite * config.getRejectProportion());
private AtomicLong memoryUsage = new AtomicLong(0);
+ private AtomicBoolean triggerRunning = new AtomicBoolean();
+ private long triggerThreshold = -1;
+ private long limitSize = -1;
+ private ReentrantLock lock = new ReentrantLock(false);
+ private Condition condition = lock.newCondition();
+ private Runnable trigger = null;
+
+ public MemoryController(long limitSize) {
+ this.limitSize = limitSize;
+ }
+
+ public MemoryController(long limitSize, long triggerThreshold, Runnable trigger) {
+ this.limitSize = limitSize;
+ this.triggerThreshold = triggerThreshold;
+ this.trigger = trigger;
+ }
+
+ public boolean tryAllocateMemory(long size) {
+ while (true) {
+ long current = memoryUsage.get();
+ long newUsage = current + size;
+
+ // We allow one request to go over the limit, to make the notification
+ // path simpler and more efficient
+ if (current > limitSize && limitSize > 0) {
+ return false;
+ }
+
+ if (memoryUsage.compareAndSet(current, newUsage)) {
+ checkTrigger(current, newUsage);
+ return true;
+ }
+ }
+ }
+
+ public void allocateMemoryMayBlock(long size) throws InterruptedException {
+ if (!tryAllocateMemory(size)) {
+ lock.lock();
+ try {
+ while (!tryAllocateMemory(size)) {
+ condition.await();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ public boolean allocateMemoryMayBlock(long size, long maxBlockTime) throws InterruptedException {
+ long startTime = System.currentTimeMillis();
+ if (!tryAllocateMemory(size)) {
+ lock.lock();
+ try {
+ while (tryAllocateMemory(size)) {
+ if (System.currentTimeMillis() - startTime >= maxBlockTime) {
+ return false;
+ }
+ condition.await();
+ }
+ return true;
+ } finally {
+ lock.unlock();
+ }
+ }
+ return true;
+ }
+
+ public void releaseMemory(long size) {
+ long newUsage = memoryUsage.addAndGet(-size);
+ if (newUsage + size > limitSize && newUsage <= limitSize) {
+ lock.lock();
+ try {
+ condition.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
- private MemoryController() {}
+ private void checkTrigger(long prevUsage, long newUsage) {
+ if (newUsage >= triggerThreshold && prevUsage < triggerThreshold && trigger != null) {
+ if (triggerRunning.compareAndSet(false, true)) {
+ try {
+ trigger.run();
+ } finally {
+ triggerRunning.set(false);
+ }
+ }
+ }
+ }
+
+ public long getCurrentMemoryUsage() {
+ return memoryUsage.get();
+ }
+
+ public double getCurrentUsagePercentage() {
+ return ((double) memoryUsage.get()) / ((double) limitSize);
+ }
- private MemoryController getInstance() {
- return INSTANCE;
+ public boolean isMemoryLimited() {
+ return this.limitSize > 0;
}
}