You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/07/12 09:22:05 UTC

[iotdb] 02/02: WriteMemoryController Ready for test

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e7131f848b674402d04d4e567f2b0dfc3223e8bc
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Tue Jul 12 17:21:54 2022 +0800

    WriteMemoryController Ready for test
---
 .../iotdb/db/engine/storagegroup/StorageGroupInfo.java  |  8 ++------
 .../iotdb/db/engine/storagegroup/TsFileProcessor.java   | 17 ++++++++---------
 .../java/org/apache/iotdb/db/rescon/SystemInfo.java     |  4 ----
 .../iotdb/db/rescon/memory/WriteMemoryController.java   |  8 ++++----
 4 files changed, 14 insertions(+), 23 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index 8577c427e7..33b310c5d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.engine.storagegroup;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.rescon.memory.WriteMemoryController;
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -75,10 +75,6 @@ public class StorageGroupInfo {
     return reportedTsps;
   }
 
-  public boolean needToReportToSystem() {
-    return memoryCost.get() - lastReportedSize.get() > storageGroupSizeReportThreshold;
-  }
-
   public void setLastReportedSize(long size) {
     lastReportedSize.set(size);
   }
@@ -91,6 +87,6 @@ public class StorageGroupInfo {
    */
   public void closeTsFileProcessorAndReportToSystem(TsFileProcessor tsFileProcessor) {
     reportedTsps.remove(tsFileProcessor);
-    SystemInfo.getInstance().resetStorageGroupStatus(this);
+    WriteMemoryController.getInstance().resetStorageGroupInfo(this);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 84c284ee05..702e336dc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -55,7 +55,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.rescon.MemTableManager;
 import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
-import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.rescon.memory.WriteMemoryController;
 import org.apache.iotdb.db.sync.sender.manager.TsFileSyncManager;
 import org.apache.iotdb.db.utils.MemUtils;
@@ -786,14 +785,16 @@ public class TsFileProcessor {
     storageGroupInfo.addStorageGroupMemCost(memTableIncrement);
     tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement);
     WriteMemoryController controller = WriteMemoryController.getInstance();
+    boolean allocateMemory = false;
     try {
-      if (!controller.tryAllocateMemory(memTableIncrement, storageGroupInfo, this)) {
+      allocateMemory = controller.tryAllocateMemory(memTableIncrement, storageGroupInfo, this);
+      if (!allocateMemory) {
         StorageEngine.blockInsertionIfReject(this);
       }
     } catch (WriteProcessRejectException e) {
       storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
       tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
-      SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+      controller.resetStorageGroupInfo(storageGroupInfo);
       throw e;
     }
     workMemTable.addTVListRamCost(memTableIncrement);
@@ -808,7 +809,8 @@ public class TsFileProcessor {
     memTableIncrement += textDataIncrement;
     storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
     tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
-    SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+    WriteMemoryController.getInstance().releaseMemory(memTableIncrement);
+    WriteMemoryController.getInstance().resetStorageGroupInfo(storageGroupInfo);
     workMemTable.releaseTVListRamCost(memTableIncrement);
     workMemTable.releaseTextDataSize(textDataIncrement);
   }
@@ -1111,9 +1113,6 @@ public class TsFileProcessor {
       flushListener.onMemTableFlushStarted(tobeFlushed);
     }
 
-    if (enableMemControl) {
-      SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost());
-    }
     flushingMemTables.addLast(tobeFlushed);
     if (logger.isDebugEnabled()) {
       logger.debug(
@@ -1169,8 +1168,8 @@ public class TsFileProcessor {
               flushingMemTables.size());
         }
         // report to System
-        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
-        SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost());
+        WriteMemoryController.getInstance().resetStorageGroupInfo(storageGroupInfo);
+        WriteMemoryController.getInstance().releaseMemory(memTable.getTVListsRamCost());
       }
       if (logger.isDebugEnabled()) {
         logger.debug(
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 09e432bec2..cb65841996 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -162,10 +162,6 @@ public class SystemInfo {
     }
   }
 
-  public synchronized void addFlushingMemTableCost(long flushingMemTableCost) {
-    this.flushingMemTablesCost += flushingMemTableCost;
-  }
-
   public synchronized void resetFlushingMemTableCost(long flushingMemTableCost) {
     this.flushingMemTablesCost -= flushingMemTableCost;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
index 6732a9e244..84dfabf760 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -91,7 +91,6 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
     for (StorageGroupInfo storageGroupInfo : reportedStorageGroupMemCostMap.keySet()) {
       allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp());
     }
-    boolean isCurrentTsFileProcessorSelected = false;
     long memCost = 0;
     long activeMemSize = memoryUsage.get();
     while (activeMemSize - memCost > FLUSH_THRESHOLD) {
@@ -106,10 +105,11 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
       memCost += selectedTsFileProcessor.getWorkMemTableRamCost();
       selectedTsFileProcessor.setWorkMemTableShouldFlush();
       flushTaskSubmitThreadPool.submit(selectedTsFileProcessor::submitAFlushTask);
-      if (selectedTsFileProcessor == currentTsFileProcessor) {
-        isCurrentTsFileProcessorSelected = true;
-      }
       allTsFileProcessors.poll();
     }
   }
+
+  public void resetStorageGroupInfo(StorageGroupInfo info) {
+    reportedStorageGroupMemCostMap.put(info, info.getMemCost());
+  }
 }