You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/07/18 03:22:59 UTC

[iotdb] 02/02: temp

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 43b8012725c4b75891c7a452a9f198350f236c12
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Mon Jul 18 11:22:43 2022 +0800

    temp
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java    |  7 ++++---
 .../db/engine/storagegroup/StorageGroupInfo.java    | 10 +++++++---
 .../java/org/apache/iotdb/db/rescon/SystemInfo.java |  4 ++--
 .../db/rescon/memory/WriteMemoryController.java     | 21 ++++++++++++++++-----
 4 files changed, 29 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index eeea38fcd0..f91c346fb6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
 import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.rescon.memory.WriteMemoryController;
 import org.apache.iotdb.db.service.metrics.MetricsService;
 import org.apache.iotdb.db.service.metrics.enums.Metric;
 import org.apache.iotdb.db.service.metrics.enums.Tag;
@@ -103,8 +104,7 @@ public class MemTableFlushTask {
     if (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()) {
       estimatedTemporaryMemSize =
           memTable.memSize() / memTable.getSeriesNumber() * config.getIoTaskQueueSizeForFlushing();
-      SystemInfo.getInstance().applyTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
-      // TODO: ALLOCATE IN WRITE MEMORY CONTROLLER
+      WriteMemoryController.getInstance().applyExternalMemoryForFlushing(estimatedTemporaryMemSize);
     }
     long start = System.currentTimeMillis();
     long sortTime = 0;
@@ -152,7 +152,8 @@ public class MemTableFlushTask {
 
     if (config.isEnableMemControl()) {
       if (estimatedTemporaryMemSize != 0) {
-        SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
+        WriteMemoryController.getInstance()
+            .releaseExternalMemoryForFlushing(estimatedTemporaryMemSize);
       }
       SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= memSerializeTime);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index e3f3769dfa..996bc0c7e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -39,7 +39,7 @@ public class StorageGroupInfo {
   private long storageGroupSizeReportThreshold =
       IoTDBDescriptor.getInstance().getConfig().getStorageGroupSizeReportThreshold();
 
-  private AtomicLong lastReportedSize = new AtomicLong();
+  private AtomicLong lastAllocateSize = new AtomicLong();
 
   /** A set of all unclosed TsFileProcessors in this SG */
   private List<TsFileProcessor> reportedTsps = new CopyOnWriteArrayList<>();
@@ -76,8 +76,12 @@ public class StorageGroupInfo {
     return reportedTsps;
   }
 
-  public void setLastReportedSize(long size) {
-    lastReportedSize.set(size);
+  public void setLastAllocateSize(long size) {
+    lastAllocateSize.set(size);
+  }
+
+  public boolean needToAllocate(long newSize) {
+    return true;
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index cb65841996..7e9961141f 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -77,7 +77,7 @@ public class SystemInfo {
           totalStorageGroupMemCost);
     }
     reportedStorageGroupMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
-    storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
+    storageGroupInfo.setLastAllocateSize(storageGroupInfo.getMemCost());
     if (totalStorageGroupMemCost < FLUSH_THERSHOLD) {
       return true;
     } else if (totalStorageGroupMemCost >= FLUSH_THERSHOLD
@@ -123,7 +123,7 @@ public class SystemInfo {
     if (reportedStorageGroupMemCostMap.containsKey(storageGroupInfo)) {
       delta = reportedStorageGroupMemCostMap.get(storageGroupInfo) - storageGroupInfo.getMemCost();
       this.totalStorageGroupMemCost -= delta;
-      storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
+      storageGroupInfo.setLastAllocateSize(storageGroupInfo.getMemCost());
       reportedStorageGroupMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
index a758cb6c17..9d1af8425f 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -37,15 +37,14 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
   private static final Logger logger = LoggerFactory.getLogger(WriteMemoryController.class);
   private static volatile WriteMemoryController INSTANCE;
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private static final long memorySizeForWrite = config.getAllocateMemoryForWrite();
-  private static final double FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
-  private static final double END_FLUSH_THRESHOLD = 0.7 * FLUSH_THRESHOLD;
-  private static final double REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
+  private static long memorySizeForWrite = config.getAllocateMemoryForWrite();
+  private static double FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
+  private static double REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
   private volatile boolean rejected = false;
   private AtomicLong flushingMemory = new AtomicLong(0);
   private Set<StorageGroupInfo> infoSet = new CopyOnWriteArraySet<>();
   private ExecutorService flushTaskSubmitThreadPool =
-      IoTDBThreadPoolFactory.newFixedThreadPool(2, "FlushTask-Submit-Pool");
+      IoTDBThreadPoolFactory.newFixedThreadPool(1, "FlushTask-Submit-Pool");
 
   public WriteMemoryController(long limitSize) {
     super(limitSize);
@@ -97,6 +96,18 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
     return INSTANCE;
   }
 
+  public void applyExternalMemoryForFlushing(long size) {
+    memorySizeForWrite -= size;
+    FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
+    REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
+  }
+
+  public void releaseExternalMemoryForFlushing(long size) {
+    memorySizeForWrite -= size;
+    FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
+    REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
+  }
+
   protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) {
     // If invoke flush by replaying logs, do not flush now!
     if (infoSet.size() == 0) {