You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/11/14 13:44:45 UTC

[iotdb] branch multi_leader_memory_pendingBatch_control updated: refine dispatcher logic

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch multi_leader_memory_pendingBatch_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/multi_leader_memory_pendingBatch_control by this push:
     new dd0b79582d refine dispatcher logic
dd0b79582d is described below

commit dd0b79582dc92a1c65348d693fcd6d9a295d2e8c
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Mon Nov 14 21:44:35 2022 +0800

    refine dispatcher logic
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../multileader/logdispatcher/LogDispatcher.java   | 52 +++++++++++-----------
 .../multileader/logdispatcher/SyncStatus.java      |  2 +-
 2 files changed, 27 insertions(+), 27 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index e2c44c2570..8c7a443d3c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -349,47 +349,47 @@ public class LogDispatcher {
 
     public PendingBatch getBatch() {
       long startIndex = syncStatus.getNextSendingIndex();
-      long maxIndexWhenBufferedRequestEmpty = startIndex;
-      logger.debug("[GetBatch] startIndex: {}", startIndex);
-      if (bufferedRequest.size() <= config.getReplication().getMaxRequestNumPerBatch()) {
-        // Use drainTo instead of poll to reduce lock overhead
+      long maxIndex;
+      synchronized (impl.getIndexObject()) {
+        maxIndex = impl.getIndex() + 1;
         logger.debug(
-            "{} : pendingRequest Size: {}, bufferedRequest size: {}",
+            "{}: startIndex: {}, maxIndex: {}, pendingRequest size: {}, bufferedRequest size: {}",
             impl.getThisNode().getGroupId(),
+            startIndex,
+            maxIndex,
             getPendingRequestSize(),
             bufferedRequest.size());
-        synchronized (impl.getIndexObject()) {
-          pendingRequest.drainTo(
-              bufferedRequest,
-              config.getReplication().getMaxRequestNumPerBatch() - bufferedRequest.size());
-          maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1;
-        }
-        // remove all request that searchIndex < startIndex
-        Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
-        while (iterator.hasNext()) {
-          IndexedConsensusRequest request = iterator.next();
-          if (request.getSearchIndex() < startIndex) {
-            iterator.remove();
-            releaseReservedMemory(request);
-          } else {
-            break;
-          }
+        // Use drainTo instead of poll to reduce lock overhead
+        pendingRequest.drainTo(
+            bufferedRequest,
+            config.getReplication().getMaxRequestNumPerBatch() - bufferedRequest.size());
+      }
+      // remove all request that searchIndex < startIndex
+      Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
+      while (iterator.hasNext()) {
+        IndexedConsensusRequest request = iterator.next();
+        if (request.getSearchIndex() < startIndex) {
+          iterator.remove();
+          releaseReservedMemory(request);
+        } else {
+          break;
         }
       }
+
       PendingBatch batches = new PendingBatch(config);
       // This condition will be executed in several scenarios:
       // 1. restart
       // 2. The getBatch() is invoked immediately at the moment the PendingRequests are consumed
       // up. To prevent inconsistency here, we use the synchronized logic when calculate value of
-      // `maxIndexWhenBufferedRequestEmpty`
+      // `maxIndex`
       if (bufferedRequest.isEmpty()) {
-        constructBatchFromWAL(startIndex, maxIndexWhenBufferedRequestEmpty, batches);
+        constructBatchFromWAL(startIndex, maxIndex, batches);
         batches.buildIndex();
         logger.debug(
             "{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batches);
       } else {
         // Notice that prev searchIndex >= startIndex
-        Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
+        iterator = bufferedRequest.iterator();
         IndexedConsensusRequest prev = iterator.next();
 
         // Prevents gap between logs. For example, some requests are not written into the queue when
@@ -481,14 +481,14 @@ public class LogDispatcher {
           logger.warn("wait for next WAL entry is interrupted");
         }
         IndexedConsensusRequest data = walEntryIterator.next();
-        if (targetIndex > data.getSearchIndex()) {
+        if (data.getSearchIndex() < targetIndex) {
           // if the index of request is smaller than currentIndex, then continue
           logger.warn(
               "search for one Entry which index is {}, but find a smaller one, index : {}",
               targetIndex,
               data.getSearchIndex());
           continue;
-        } else if (targetIndex < data.getSearchIndex()) {
+        } else if (data.getSearchIndex() > targetIndex) {
           logger.warn(
               "search for one Entry which index is {}, but find a larger one, index : {}",
               targetIndex,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
index e0ec7d4023..25af8b8fcd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
@@ -80,7 +80,7 @@ public class SyncStatus {
   public void free() {
     long size = 0;
     for (PendingBatch pendingBatch : pendingBatches) {
-      size = pendingBatch.getSerializedSize();
+      size += pendingBatch.getSerializedSize();
     }
     pendingBatches.clear();
     multiLeaderMemoryManager.free(size);