You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/11/10 01:20:07 UTC

[iotdb] branch multi_leader_memory_pendingBatch_control updated: Add memory control of SyncStatus and strength memory control of Queue (#7956)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch multi_leader_memory_pendingBatch_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/multi_leader_memory_pendingBatch_control by this push:
     new f99c707673 Add memory control of SyncStatus and strength memory control of Queue (#7956)
f99c707673 is described below

commit f99c707673b895686da2a5e6827ab9f85ba142fa
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Thu Nov 10 09:20:01 2022 +0800

    Add memory control of SyncStatus and strength memory control of Queue (#7956)
---
 .../apache/iotdb/consensus/config/MultiLeaderConfig.java | 16 ++++++++++++----
 .../consensus/multileader/MultiLeaderConsensus.java      |  4 +++-
 .../multileader/logdispatcher/LogDispatcher.java         | 12 +++++++++---
 .../logdispatcher/MultiLeaderMemoryManager.java          | 10 +++++++---
 .../multileader/logdispatcher/PendingBatch.java          |  4 ++++
 .../consensus/multileader/logdispatcher/SyncStatus.java  | 15 ++++++++++++++-
 6 files changed, 49 insertions(+), 12 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 8c7abec0ce..072dc00c37 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -197,7 +197,6 @@ public class MultiLeaderConfig {
 
   public static class Replication {
     private final int maxRequestNumPerBatch;
-
     private final int maxSizePerBatch;
     private final int maxPendingBatch;
     private final int maxWaitingTimeForAccumulatingBatchInMs;
@@ -206,7 +205,8 @@ public class MultiLeaderConfig {
     private final long walThrottleThreshold;
     private final long throttleTimeOutMs;
     private final long checkpointGap;
-    private final Long allocateMemoryForConsensus;
+    private final long allocateMemoryForConsensus;
+    private final long allocateMemoryForQueue;
 
     private Replication(
         int maxRequestNumPerBatch,
@@ -218,7 +218,8 @@ public class MultiLeaderConfig {
         long walThrottleThreshold,
         long throttleTimeOutMs,
         long checkpointGap,
-        long allocateMemoryForConsensus) {
+        long allocateMemoryForConsensus,
+        double maxMemoryRatioForQueue) {
       this.maxRequestNumPerBatch = maxRequestNumPerBatch;
       this.maxSizePerBatch = maxSizePerBatch;
       this.maxPendingBatch = maxPendingBatch;
@@ -229,6 +230,7 @@ public class MultiLeaderConfig {
       this.throttleTimeOutMs = throttleTimeOutMs;
       this.checkpointGap = checkpointGap;
       this.allocateMemoryForConsensus = allocateMemoryForConsensus;
+      this.allocateMemoryForQueue = (long) (allocateMemoryForConsensus * maxMemoryRatioForQueue);
     }
 
     public int getMaxRequestNumPerBatch() {
@@ -271,6 +273,10 @@ public class MultiLeaderConfig {
       return allocateMemoryForConsensus;
     }
 
+    public long getAllocateMemoryForQueue() {
+      return allocateMemoryForQueue;
+    }
+
     public static Replication.Builder newBuilder() {
       return new Replication.Builder();
     }
@@ -288,6 +294,7 @@ public class MultiLeaderConfig {
       private long throttleTimeOutMs = TimeUnit.SECONDS.toMillis(30);
       private long checkpointGap = 500;
       private long allocateMemoryForConsensus;
+      private double maxMemoryRatioForQueue = 0.6;
 
       public Replication.Builder setMaxRequestNumPerBatch(int maxRequestNumPerBatch) {
         this.maxRequestNumPerBatch = maxRequestNumPerBatch;
@@ -351,7 +358,8 @@ public class MultiLeaderConfig {
             walThrottleThreshold,
             throttleTimeOutMs,
             checkpointGap,
-            allocateMemoryForConsensus);
+            allocateMemoryForConsensus,
+            maxMemoryRatioForQueue);
       }
     }
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index 7a86bdf540..3a6d7c1bf5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -99,7 +99,9 @@ public class MultiLeaderConsensus implements IConsensus {
                 new SyncMultiLeaderServiceClientPoolFactory(config.getMultiLeaderConfig()));
     // init multiLeader memory manager
     MultiLeaderMemoryManager.getInstance()
-        .init(config.getMultiLeaderConfig().getReplication().getAllocateMemoryForConsensus());
+        .init(
+            config.getMultiLeaderConfig().getReplication().getAllocateMemoryForConsensus(),
+            config.getMultiLeaderConfig().getReplication().getAllocateMemoryForQueue());
   }
 
   @Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 4880e2aae4..e2c44c2570 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -243,7 +243,7 @@ public class LogDispatcher {
 
     /** try to offer a request into queue with memory control */
     public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
-      if (!multiLeaderMemoryManager.reserve(indexedConsensusRequest.getSerializedSize())) {
+      if (!multiLeaderMemoryManager.reserve(indexedConsensusRequest.getSerializedSize(), true)) {
         return false;
       }
       boolean success;
@@ -268,12 +268,18 @@ public class LogDispatcher {
 
     public void stop() {
       stopped = true;
+      long requestSize = 0;
       for (IndexedConsensusRequest indexedConsensusRequest : pendingRequest) {
-        multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
+        requestSize += indexedConsensusRequest.getSerializedSize();
       }
+      pendingRequest.clear();
+      multiLeaderMemoryManager.free(requestSize);
+      requestSize = 0;
       for (IndexedConsensusRequest indexedConsensusRequest : bufferedRequest) {
-        multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
+        requestSize += indexedConsensusRequest.getSerializedSize();
       }
+      multiLeaderMemoryManager.free(requestSize);
+      syncStatus.free();
       MetricService.getInstance().removeMetricSet(metrics);
     }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java
index fb64f8a489..296fd1c716 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java
@@ -31,16 +31,19 @@ public class MultiLeaderMemoryManager {
   private static final Logger logger = LoggerFactory.getLogger(MultiLeaderMemoryManager.class);
   private final AtomicLong memorySizeInByte = new AtomicLong(0);
   private Long maxMemorySizeInByte = Runtime.getRuntime().maxMemory() / 10;
+  private Long maxMemorySizeForQueueInByte = Runtime.getRuntime().maxMemory() / 100 * 8;
 
   private MultiLeaderMemoryManager() {
     MetricService.getInstance().addMetricSet(new MultiLeaderMemoryManagerMetrics(this));
   }
 
-  public boolean reserve(long size) {
+  public boolean reserve(long size, boolean fromQueue) {
     AtomicBoolean result = new AtomicBoolean(false);
     memorySizeInByte.updateAndGet(
         (memorySize) -> {
-          if (size > maxMemorySizeInByte - memorySize) {
+          long remainSize =
+              (fromQueue ? maxMemorySizeForQueueInByte : maxMemorySizeInByte) - memorySize;
+          if (memorySize > remainSize) {
             logger.debug(
                 "consensus memory limited. required: {}, used: {}, total: {}",
                 size,
@@ -70,8 +73,9 @@ public class MultiLeaderMemoryManager {
         currentUsedMemory);
   }
 
-  public void init(long maxMemorySize) {
+  public void init(long maxMemorySize, long maxMemorySizeForQueue) {
     this.maxMemorySizeInByte = maxMemorySize;
+    this.maxMemorySizeForQueueInByte = maxMemorySizeForQueue;
   }
 
   long getMemorySizeInByte() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
index 920e781b09..5d4685b19e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
@@ -87,6 +87,10 @@ public class PendingBatch {
     return batches.isEmpty();
   }
 
+  public long getSerializedSize() {
+    return serializedSize;
+  }
+
   @Override
   public String toString() {
     return "PendingBatch{"
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
index 3549c1158b..e0ec7d4023 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
@@ -31,6 +31,8 @@ public class SyncStatus {
   private final MultiLeaderConfig config;
   private final IndexController controller;
   private final LinkedList<PendingBatch> pendingBatches = new LinkedList<>();
+  private final MultiLeaderMemoryManager multiLeaderMemoryManager =
+      MultiLeaderMemoryManager.getInstance();
 
   public SyncStatus(IndexController controller, MultiLeaderConfig config) {
     this.controller = controller;
@@ -40,7 +42,8 @@ public class SyncStatus {
   /** we may block here if the synchronization pipeline is full */
   public void addNextBatch(PendingBatch batch) throws InterruptedException {
     synchronized (this) {
-      while (pendingBatches.size() >= config.getReplication().getMaxPendingBatch()) {
+      while (pendingBatches.size() >= config.getReplication().getMaxPendingBatch()
+          || !multiLeaderMemoryManager.reserve(batch.getSerializedSize(), false)) {
         wait();
       }
       pendingBatches.add(batch);
@@ -61,6 +64,7 @@ public class SyncStatus {
         while (current.isSynced()) {
           controller.updateAndGet(current.getEndIndex());
           iterator.remove();
+          multiLeaderMemoryManager.free(current.getSerializedSize());
           if (iterator.hasNext()) {
             current = iterator.next();
           } else {
@@ -73,6 +77,15 @@ public class SyncStatus {
     }
   }
 
+  public void free() {
+    long size = 0;
+    for (PendingBatch pendingBatch : pendingBatches) {
+      size = pendingBatch.getSerializedSize();
+    }
+    pendingBatches.clear();
+    multiLeaderMemoryManager.free(size);
+  }
+
   /** Gets the first index that is not currently synchronized */
   public long getNextSendingIndex() {
     // we do not use ReentrantReadWriteLock because there will be only one thread reading this field