You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/07/18 03:22:57 UTC

[iotdb] branch IOTDB-3164 updated (2ab1dddf96 -> 43b8012725)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a change to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from 2ab1dddf96 finish for test
     new 95793820e8 temp
     new 43b8012725 temp

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  7 ++++---
 .../db/engine/storagegroup/StorageGroupInfo.java   | 10 +++++++---
 .../db/engine/storagegroup/TsFileProcessor.java    |  1 -
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |  4 ++--
 .../db/rescon/memory/WriteMemoryController.java    | 23 +++++++++++++++++-----
 5 files changed, 31 insertions(+), 14 deletions(-)


[iotdb] 02/02: temp

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 43b8012725c4b75891c7a452a9f198350f236c12
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Mon Jul 18 11:22:43 2022 +0800

    temp
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java    |  7 ++++---
 .../db/engine/storagegroup/StorageGroupInfo.java    | 10 +++++++---
 .../java/org/apache/iotdb/db/rescon/SystemInfo.java |  4 ++--
 .../db/rescon/memory/WriteMemoryController.java     | 21 ++++++++++++++++-----
 4 files changed, 29 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index eeea38fcd0..f91c346fb6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
 import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.rescon.memory.WriteMemoryController;
 import org.apache.iotdb.db.service.metrics.MetricsService;
 import org.apache.iotdb.db.service.metrics.enums.Metric;
 import org.apache.iotdb.db.service.metrics.enums.Tag;
@@ -103,8 +104,7 @@ public class MemTableFlushTask {
     if (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()) {
       estimatedTemporaryMemSize =
           memTable.memSize() / memTable.getSeriesNumber() * config.getIoTaskQueueSizeForFlushing();
-      SystemInfo.getInstance().applyTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
-      // TODO: ALLOCATE IN WRITE MEMORY CONTROLLER
+      WriteMemoryController.getInstance().applyExternalMemoryForFlushing(estimatedTemporaryMemSize);
     }
     long start = System.currentTimeMillis();
     long sortTime = 0;
@@ -152,7 +152,8 @@ public class MemTableFlushTask {
 
     if (config.isEnableMemControl()) {
       if (estimatedTemporaryMemSize != 0) {
-        SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
+        WriteMemoryController.getInstance()
+            .releaseExternalMemoryForFlushing(estimatedTemporaryMemSize);
       }
       SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= memSerializeTime);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index e3f3769dfa..996bc0c7e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -39,7 +39,7 @@ public class StorageGroupInfo {
   private long storageGroupSizeReportThreshold =
       IoTDBDescriptor.getInstance().getConfig().getStorageGroupSizeReportThreshold();
 
-  private AtomicLong lastReportedSize = new AtomicLong();
+  private AtomicLong lastAllocateSize = new AtomicLong();
 
   /** A set of all unclosed TsFileProcessors in this SG */
   private List<TsFileProcessor> reportedTsps = new CopyOnWriteArrayList<>();
@@ -76,8 +76,12 @@ public class StorageGroupInfo {
     return reportedTsps;
   }
 
-  public void setLastReportedSize(long size) {
-    lastReportedSize.set(size);
+  public void setLastAllocateSize(long size) {
+    lastAllocateSize.set(size);
+  }
+
+  public boolean needToAllocate(long newSize) {
+    return true;
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index cb65841996..7e9961141f 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -77,7 +77,7 @@ public class SystemInfo {
           totalStorageGroupMemCost);
     }
     reportedStorageGroupMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
-    storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
+    storageGroupInfo.setLastAllocateSize(storageGroupInfo.getMemCost());
     if (totalStorageGroupMemCost < FLUSH_THERSHOLD) {
       return true;
     } else if (totalStorageGroupMemCost >= FLUSH_THERSHOLD
@@ -123,7 +123,7 @@ public class SystemInfo {
     if (reportedStorageGroupMemCostMap.containsKey(storageGroupInfo)) {
       delta = reportedStorageGroupMemCostMap.get(storageGroupInfo) - storageGroupInfo.getMemCost();
       this.totalStorageGroupMemCost -= delta;
-      storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
+      storageGroupInfo.setLastAllocateSize(storageGroupInfo.getMemCost());
       reportedStorageGroupMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
index a758cb6c17..9d1af8425f 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -37,15 +37,14 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
   private static final Logger logger = LoggerFactory.getLogger(WriteMemoryController.class);
   private static volatile WriteMemoryController INSTANCE;
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private static final long memorySizeForWrite = config.getAllocateMemoryForWrite();
-  private static final double FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
-  private static final double END_FLUSH_THRESHOLD = 0.7 * FLUSH_THRESHOLD;
-  private static final double REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
+  private static long memorySizeForWrite = config.getAllocateMemoryForWrite();
+  private static double FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
+  private static double REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
   private volatile boolean rejected = false;
   private AtomicLong flushingMemory = new AtomicLong(0);
   private Set<StorageGroupInfo> infoSet = new CopyOnWriteArraySet<>();
   private ExecutorService flushTaskSubmitThreadPool =
-      IoTDBThreadPoolFactory.newFixedThreadPool(2, "FlushTask-Submit-Pool");
+      IoTDBThreadPoolFactory.newFixedThreadPool(1, "FlushTask-Submit-Pool");
 
   public WriteMemoryController(long limitSize) {
     super(limitSize);
@@ -97,6 +96,18 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
     return INSTANCE;
   }
 
+  public void applyExternalMemoryForFlushing(long size) {
+    memorySizeForWrite -= size;
+    FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
+    REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
+  }
+
+  public void releaseExternalMemoryForFlushing(long size) {
+    memorySizeForWrite -= size;
+    FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
+    REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
+  }
+
   protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) {
     // If invoke flush by replaying logs, do not flush now!
     if (infoSet.size() == 0) {


[iotdb] 01/02: temp

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 95793820e89c6d49eec22dad25c964d7d8179b93
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Fri Jul 15 10:21:34 2022 +0800

    temp
---
 .../org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java    | 1 -
 .../org/apache/iotdb/db/rescon/memory/WriteMemoryController.java    | 6 ++++--
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index adaee20f82..a22031d78a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -1169,7 +1169,6 @@ public class TsFileProcessor {
         WriteMemoryController.getInstance()
             .releaseFlushingMemory(
                 memTable.getTVListsRamCost(), storageGroupName, memTable.getMemTableId());
-        logger.error("Release size {} for {}", memTable.getTVListsRamCost(), storageGroupName);
       }
       if (logger.isDebugEnabled()) {
         logger.debug(
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
index a487ffcc43..a758cb6c17 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -39,12 +39,13 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final long memorySizeForWrite = config.getAllocateMemoryForWrite();
   private static final double FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
+  private static final double END_FLUSH_THRESHOLD = 0.7 * FLUSH_THRESHOLD;
   private static final double REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
   private volatile boolean rejected = false;
   private AtomicLong flushingMemory = new AtomicLong(0);
   private Set<StorageGroupInfo> infoSet = new CopyOnWriteArraySet<>();
   private ExecutorService flushTaskSubmitThreadPool =
-      IoTDBThreadPoolFactory.newSingleThreadExecutor("FlushTask-Submit-Pool");
+      IoTDBThreadPoolFactory.newFixedThreadPool(2, "FlushTask-Submit-Pool");
 
   public WriteMemoryController(long limitSize) {
     super(limitSize);
@@ -122,7 +123,8 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> {
       if (selectedTsFileProcessor == null) {
         break;
       }
-      if (selectedTsFileProcessor.shouldFlush()) {
+      if (selectedTsFileProcessor.getWorkMemTable() == null
+          || selectedTsFileProcessor.getWorkMemTable().shouldFlush()) {
         continue;
       }
       memCost += selectedTsFileProcessor.getWorkMemTableRamCost();