You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/11/09 14:11:36 UTC

[iotdb] branch multi_leader_memory_pendingBatch_control updated (e6aa8dea1f -> 065840b769)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch multi_leader_memory_pendingBatch_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git


 discard e6aa8dea1f Control the size of each pending batch to be no larger than maxSizePerBatch
     new 065840b769 Control the size of each pending batch to be no larger than maxSizePerBatch

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (e6aa8dea1f)
            \
             N -- N -- N   refs/heads/multi_leader_memory_pendingBatch_control (065840b769)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../multileader/client/DispatchLogHandler.java     |  2 +-
 .../logdispatcher/AccumulatingBatch.java           | 67 ----------------------
 .../multileader/logdispatcher/LogDispatcher.java   | 63 ++++++++++----------
 .../multileader/logdispatcher/PendingBatch.java    | 55 ++++++++++++------
 .../multileader/logdispatcher/SyncStatusTest.java  | 26 +++++----
 5 files changed, 82 insertions(+), 131 deletions(-)
 delete mode 100644 consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/AccumulatingBatch.java


[iotdb] 01/01: Control the size of each pending batch to be no larger than maxSizePerBatch

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch multi_leader_memory_pendingBatch_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 065840b769d7424215ee7e3bfaf708c4568a818b
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Wed Nov 9 21:19:35 2022 +0800

    Control the size of each pending batch to be no larger than maxSizePerBatch
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../common/request/IndexedConsensusRequest.java    | 26 +++----
 .../iotdb/consensus/config/MultiLeaderConfig.java  | 38 +++++++---
 .../multileader/MultiLeaderServerImpl.java         |  2 +-
 .../multileader/logdispatcher/LogDispatcher.java   | 82 +++++++++++-----------
 .../multileader/logdispatcher/PendingBatch.java    | 40 ++++++++---
 .../multileader/logdispatcher/SyncStatus.java      |  4 +-
 .../multileader/logdispatcher/SyncStatusTest.java  | 21 ++++--
 .../plan/node/write/InsertMultiTabletsNode.java    |  2 +-
 8 files changed, 131 insertions(+), 84 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 6abca549b6..ca236da1c3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.consensus.common.request;
 
 import java.nio.ByteBuffer;
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
@@ -31,8 +31,8 @@ public class IndexedConsensusRequest implements IConsensusRequest {
   private final long searchIndex;
 
   private final long syncIndex;
-  private List<IConsensusRequest> requests;
-  private List<ByteBuffer> serializedRequests;
+  private final List<IConsensusRequest> requests;
+  private final List<ByteBuffer> serializedRequests = new ArrayList<>();
   private long serializedSize = 0;
 
   public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
@@ -41,15 +41,6 @@ public class IndexedConsensusRequest implements IConsensusRequest {
     this.syncIndex = -1L;
   }
 
-  public IndexedConsensusRequest(List<ByteBuffer> serializedRequests, long searchIndex) {
-    this.searchIndex = searchIndex;
-    this.serializedRequests = serializedRequests;
-    for (ByteBuffer byteBuffer : serializedRequests) {
-      serializedSize += byteBuffer.capacity();
-    }
-    this.syncIndex = -1L;
-  }
-
   public IndexedConsensusRequest(
       long searchIndex, long syncIndex, List<IConsensusRequest> requests) {
     this.searchIndex = searchIndex;
@@ -70,10 +61,13 @@ public class IndexedConsensusRequest implements IConsensusRequest {
     return serializedRequests;
   }
 
-  public List<ByteBuffer> buildSerializedRequests() {
-    List<ByteBuffer> result = new LinkedList<>();
-    this.requests.forEach(r -> result.add(r.serializeToByteBuffer()));
-    return result;
+  public void buildSerializedRequests() {
+    this.requests.forEach(
+        r -> {
+          ByteBuffer buffer = r.serializeToByteBuffer();
+          this.serializedRequests.add(buffer);
+          this.serializedSize += buffer.capacity();
+        });
   }
 
   public long getSerializedSize() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index cd0d58c4ae..8c7abec0ce 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -196,7 +196,9 @@ public class MultiLeaderConfig {
   }
 
   public static class Replication {
-    private final int maxRequestPerBatch;
+    private final int maxRequestNumPerBatch;
+
+    private final int maxSizePerBatch;
     private final int maxPendingBatch;
     private final int maxWaitingTimeForAccumulatingBatchInMs;
     private final long basicRetryWaitTimeMs;
@@ -207,7 +209,8 @@ public class MultiLeaderConfig {
     private final Long allocateMemoryForConsensus;
 
     private Replication(
-        int maxRequestPerBatch,
+        int maxRequestNumPerBatch,
+        int maxSizePerBatch,
         int maxPendingBatch,
         int maxWaitingTimeForAccumulatingBatchInMs,
         long basicRetryWaitTimeMs,
@@ -216,7 +219,8 @@ public class MultiLeaderConfig {
         long throttleTimeOutMs,
         long checkpointGap,
         long allocateMemoryForConsensus) {
-      this.maxRequestPerBatch = maxRequestPerBatch;
+      this.maxRequestNumPerBatch = maxRequestNumPerBatch;
+      this.maxSizePerBatch = maxSizePerBatch;
       this.maxPendingBatch = maxPendingBatch;
       this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs;
       this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
@@ -227,8 +231,12 @@ public class MultiLeaderConfig {
       this.allocateMemoryForConsensus = allocateMemoryForConsensus;
     }
 
-    public int getMaxRequestPerBatch() {
-      return maxRequestPerBatch;
+    public int getMaxRequestNumPerBatch() {
+      return maxRequestNumPerBatch;
+    }
+
+    public int getMaxSizePerBatch() {
+      return maxSizePerBatch;
     }
 
     public int getMaxPendingBatch() {
@@ -268,7 +276,8 @@ public class MultiLeaderConfig {
     }
 
     public static class Builder {
-      private int maxRequestPerBatch = 30;
+      private int maxRequestNumPerBatch = 30;
+      private int maxSizePerBatch = 4 * 1024 * 1024;
       // (IMPORTANT) Value of this variable should be the same with MAX_REQUEST_CACHE_SIZE
       // in DataRegionStateMachine
       private int maxPendingBatch = 5;
@@ -280,8 +289,13 @@ public class MultiLeaderConfig {
       private long checkpointGap = 500;
       private long allocateMemoryForConsensus;
 
-      public Replication.Builder setMaxRequestPerBatch(int maxRequestPerBatch) {
-        this.maxRequestPerBatch = maxRequestPerBatch;
+      public Replication.Builder setMaxRequestNumPerBatch(int maxRequestNumPerBatch) {
+        this.maxRequestNumPerBatch = maxRequestNumPerBatch;
+        return this;
+      }
+
+      public Builder setMaxSizePerBatch(int maxSizePerBatch) {
+        this.maxSizePerBatch = maxSizePerBatch;
         return this;
       }
 
@@ -316,6 +330,11 @@ public class MultiLeaderConfig {
         return this;
       }
 
+      public Builder setCheckpointGap(long checkpointGap) {
+        this.checkpointGap = checkpointGap;
+        return this;
+      }
+
       public Replication.Builder setAllocateMemoryForConsensus(long allocateMemoryForConsensus) {
         this.allocateMemoryForConsensus = allocateMemoryForConsensus;
         return this;
@@ -323,7 +342,8 @@ public class MultiLeaderConfig {
 
       public Replication build() {
         return new Replication(
-            maxRequestPerBatch,
+            maxRequestNumPerBatch,
+            maxSizePerBatch,
             maxPendingBatch,
             maxWaitingTimeForAccumulatingBatchInMs,
             basicRetryWaitTimeMs,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index e965973459..46f22472aa 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -99,7 +99,7 @@ public class MultiLeaderServerImpl {
   private final LogDispatcher logDispatcher;
   private final MultiLeaderConfig config;
   private final ConsensusReqReader reader;
-  private boolean active;
+  private volatile boolean active;
   private String latestSnapshotId;
   private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager;
   private final MultiLeaderServerMetrics metrics;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 606731e6a7..4880e2aae4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -59,6 +58,7 @@ import java.util.stream.Collectors;
 
 /** Manage all asynchronous replication threads and corresponding async clients */
 public class LogDispatcher {
+
   private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
   private static final long DEFAULT_INITIAL_SYNC_INDEX = 0L;
   private final MultiLeaderServerImpl impl;
@@ -156,7 +156,7 @@ public class LogDispatcher {
   }
 
   public void offer(IndexedConsensusRequest request) {
-    List<ByteBuffer> serializedRequests = request.buildSerializedRequests();
+    request.buildSerializedRequests();
     // we put the serialization step outside the synchronized block because it is stateless and
     // time-consuming
     synchronized (this) {
@@ -167,8 +167,7 @@ public class LogDispatcher {
                 impl.getThisNode().getGroupId(),
                 thread.getPeer().getEndpoint().getIp(),
                 thread.getPendingRequestSize());
-            if (!thread.offer(
-                new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
+            if (!thread.offer(request)) {
               logger.debug(
                   "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
                   impl.getThisNode().getGroupId(),
@@ -180,6 +179,7 @@ public class LogDispatcher {
   }
 
   public class LogDispatcherThread implements Runnable {
+
     private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10;
     private static final long START_INDEX = 1;
     private final MultiLeaderConfig config;
@@ -198,7 +198,7 @@ public class LogDispatcher {
         MultiLeaderMemoryManager.getInstance();
     private volatile boolean stopped = false;
 
-    private ConsensusReqReader.ReqIterator walEntryIterator;
+    private final ConsensusReqReader.ReqIterator walEntryIterator;
 
     private final LogDispatcherThreadMetrics metrics;
 
@@ -300,7 +300,7 @@ public class LogDispatcher {
             if (request != null) {
               bufferedRequest.add(request);
               // If write pressure is low, we simply sleep a little to reduce the number of RPC
-              if (pendingRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
+              if (pendingRequest.size() <= config.getReplication().getMaxRequestNumPerBatch()) {
                 Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
               }
             }
@@ -342,12 +342,10 @@ public class LogDispatcher {
     }
 
     public PendingBatch getBatch() {
-      PendingBatch batch;
-      List<TLogBatch> logBatches = new ArrayList<>();
       long startIndex = syncStatus.getNextSendingIndex();
       long maxIndexWhenBufferedRequestEmpty = startIndex;
       logger.debug("[GetBatch] startIndex: {}", startIndex);
-      if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
+      if (bufferedRequest.size() <= config.getReplication().getMaxRequestNumPerBatch()) {
         // Use drainTo instead of poll to reduce lock overhead
         logger.debug(
             "{} : pendingRequest Size: {}, bufferedRequest size: {}",
@@ -357,7 +355,7 @@ public class LogDispatcher {
         synchronized (impl.getIndexObject()) {
           pendingRequest.drainTo(
               bufferedRequest,
-              config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
+              config.getReplication().getMaxRequestNumPerBatch() - bufferedRequest.size());
           maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1;
         }
         // remove all request that searchIndex < startIndex
@@ -372,48 +370,57 @@ public class LogDispatcher {
           }
         }
       }
+      PendingBatch batches = new PendingBatch(config);
       // This condition will be executed in several scenarios:
       // 1. restart
       // 2. The getBatch() is invoked immediately at the moment the PendingRequests are consumed
       // up. To prevent inconsistency here, we use the synchronized logic when calculate value of
       // `maxIndexWhenBufferedRequestEmpty`
       if (bufferedRequest.isEmpty()) {
-        constructBatchFromWAL(startIndex, maxIndexWhenBufferedRequestEmpty, logBatches);
-        batch = new PendingBatch(logBatches);
+        constructBatchFromWAL(startIndex, maxIndexWhenBufferedRequestEmpty, batches);
+        batches.buildIndex();
         logger.debug(
-            "{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batch);
+            "{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batches);
       } else {
         // Notice that prev searchIndex >= startIndex
         Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
         IndexedConsensusRequest prev = iterator.next();
+
         // Prevents gap between logs. For example, some requests are not written into the queue when
         // the queue is full. In this case, requests need to be loaded from the WAL
-        constructBatchFromWAL(startIndex, prev.getSearchIndex(), logBatches);
-        if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
-          batch = new PendingBatch(logBatches);
-          logger.debug("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batch);
-          return batch;
+        constructBatchFromWAL(startIndex, prev.getSearchIndex(), batches);
+        if (!batches.canAccumulate()) {
+          batches.buildIndex();
+          logger.debug("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batches);
+          return batches;
         }
-        constructBatchIndexedFromConsensusRequest(prev, logBatches);
+
+        constructBatchIndexedFromConsensusRequest(prev, batches);
         iterator.remove();
         releaseReservedMemory(prev);
-        while (iterator.hasNext()
-            && logBatches.size() <= config.getReplication().getMaxRequestPerBatch()) {
+        if (!batches.canAccumulate()) {
+          batches.buildIndex();
+          logger.debug(
+              "{} : accumulated a {} from queue", impl.getThisNode().getGroupId(), batches);
+          return batches;
+        }
+
+        while (iterator.hasNext() && batches.canAccumulate()) {
           IndexedConsensusRequest current = iterator.next();
           // Prevents gap between logs. For example, some logs are not written into the queue when
           // the queue is full. In this case, requests need to be loaded from the WAL
           if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
-            constructBatchFromWAL(prev.getSearchIndex(), current.getSearchIndex(), logBatches);
-            if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
-              batch = new PendingBatch(logBatches);
+            constructBatchFromWAL(prev.getSearchIndex(), current.getSearchIndex(), batches);
+            if (!batches.canAccumulate()) {
+              batches.buildIndex();
               logger.debug(
                   "gap {} : accumulated a {} from queue and wal when gap",
                   impl.getThisNode().getGroupId(),
-                  batch);
-              return batch;
+                  batches);
+              return batches;
             }
           }
-          constructBatchIndexedFromConsensusRequest(current, logBatches);
+          constructBatchIndexedFromConsensusRequest(current, batches);
           prev = current;
           // We might not be able to remove all the elements in the bufferedRequest in the
           // current function, but that's fine, we'll continue processing these elements in the
@@ -421,11 +428,11 @@ public class LogDispatcher {
           iterator.remove();
           releaseReservedMemory(current);
         }
-        batch = new PendingBatch(logBatches);
+        batches.buildIndex();
         logger.debug(
-            "{} : accumulated a {} from queue and wal", impl.getThisNode().getGroupId(), batch);
+            "{} : accumulated a {} from queue and wal", impl.getThisNode().getGroupId(), batches);
       }
-      return batch;
+      return batches;
     }
 
     public void sendBatchAsync(PendingBatch batch, DispatchLogHandler handler) {
@@ -450,8 +457,7 @@ public class LogDispatcher {
       return syncStatus;
     }
 
-    private long constructBatchFromWAL(
-        long currentIndex, long maxIndex, List<TLogBatch> logBatches) {
+    private void constructBatchFromWAL(long currentIndex, long maxIndex, PendingBatch logBatches) {
       logger.debug(
           String.format(
               "DataRegion[%s]->%s: currentIndex: %d, maxIndex: %d",
@@ -460,8 +466,7 @@ public class LogDispatcher {
       long targetIndex = currentIndex;
       // Even if there is no WAL files, these code won't produce error.
       walEntryIterator.skipTo(targetIndex);
-      while (targetIndex < maxIndex
-          && logBatches.size() < config.getReplication().getMaxRequestPerBatch()) {
+      while (targetIndex < maxIndex && logBatches.canAccumulate()) {
         logger.debug("construct from WAL for one Entry, index : {}", targetIndex);
         try {
           walEntryIterator.waitForNextReady();
@@ -490,19 +495,16 @@ public class LogDispatcher {
         targetIndex = data.getSearchIndex() + 1;
         // construct request from wal
         for (IConsensusRequest innerRequest : data.getRequests()) {
-          logBatches.add(
+          logBatches.addTLogBatch(
               new TLogBatch(innerRequest.serializeToByteBuffer(), data.getSearchIndex(), true));
         }
       }
-      return logBatches.size() > 0
-          ? logBatches.get(logBatches.size() - 1).searchIndex
-          : currentIndex;
     }
 
     private void constructBatchIndexedFromConsensusRequest(
-        IndexedConsensusRequest request, List<TLogBatch> logBatches) {
+        IndexedConsensusRequest request, PendingBatch logBatches) {
       for (ByteBuffer innerRequest : request.getSerializedRequests()) {
-        logBatches.add(new TLogBatch(innerRequest, request.getSearchIndex(), false));
+        logBatches.addTLogBatch(new TLogBatch(innerRequest, request.getSearchIndex(), false));
       }
     }
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
index a6ee43392f..920e781b09 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
@@ -19,28 +19,48 @@
 
 package org.apache.iotdb.consensus.multileader.logdispatcher;
 
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
 
+import java.util.ArrayList;
 import java.util.List;
 
 public class PendingBatch {
 
-  private final long startIndex;
-  private final long endIndex;
-  private final List<TLogBatch> batches;
+  private final MultiLeaderConfig config;
+
+  private long startIndex;
+  private long endIndex;
+
+  private final List<TLogBatch> batches = new ArrayList<>();
+
+  private long serializedSize;
   // indicates whether this batch has been successfully synchronized to another node
   private boolean synced;
 
-  public PendingBatch(List<TLogBatch> batches) {
+  public PendingBatch(MultiLeaderConfig config) {
+    this.config = config;
+  }
+
+  /*
+  Note: this method must be called once after all the `addTLogBatch` functions have been called
+   */
+  public void buildIndex() {
     if (!batches.isEmpty()) {
       this.startIndex = batches.get(0).getSearchIndex();
       this.endIndex = batches.get(batches.size() - 1).getSearchIndex();
-    } else {
-      this.startIndex = 0;
-      this.endIndex = 0;
     }
-    this.batches = batches;
-    this.synced = false;
+  }
+
+  public void addTLogBatch(TLogBatch batch) {
+    batches.add(batch);
+    // TODO Maybe we need to add in additional fields for more accurate calculations
+    serializedSize += batch.getData() == null ? 0 : batch.getData().length;
+  }
+
+  public boolean canAccumulate() {
+    return batches.size() < config.getReplication().getMaxRequestNumPerBatch()
+        && serializedSize < config.getReplication().getMaxSizePerBatch();
   }
 
   public long getStartIndex() {
@@ -76,6 +96,8 @@ public class PendingBatch {
         + endIndex
         + ", size="
         + batches.size()
+        + ", serializedSize="
+        + serializedSize
         + '}';
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
index e9901d931a..3549c1158b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
@@ -30,7 +30,7 @@ public class SyncStatus {
 
   private final MultiLeaderConfig config;
   private final IndexController controller;
-  private final List<PendingBatch> pendingBatches = new LinkedList<>();
+  private final LinkedList<PendingBatch> pendingBatches = new LinkedList<>();
 
   public SyncStatus(IndexController controller, MultiLeaderConfig config) {
     this.controller = controller;
@@ -80,7 +80,7 @@ public class SyncStatus {
       return 1
           + (pendingBatches.isEmpty()
               ? controller.getCurrentIndex()
-              : pendingBatches.get(pendingBatches.size() - 1).getEndIndex());
+              : pendingBatches.getLast().getEndIndex());
     }
   }
 
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
index c4363428fb..bec9c6c955 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
@@ -31,7 +31,6 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -66,7 +65,9 @@ public class SyncStatusTest {
     for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       TLogBatch logBatch = new TLogBatch();
       logBatch.setSearchIndex(i);
-      PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch));
+      PendingBatch batch = new PendingBatch(MultiLeaderConfig.newBuilder().build());
+      batch.addTLogBatch(logBatch);
+      batch.buildIndex();
       batchList.add(batch);
       status.addNextBatch(batch);
     }
@@ -95,7 +96,9 @@ public class SyncStatusTest {
     for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       TLogBatch logBatch = new TLogBatch();
       logBatch.setSearchIndex(i);
-      PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch));
+      PendingBatch batch = new PendingBatch(MultiLeaderConfig.newBuilder().build());
+      batch.addTLogBatch(logBatch);
+      batch.buildIndex();
       batchList.add(batch);
       status.addNextBatch(batch);
     }
@@ -130,7 +133,9 @@ public class SyncStatusTest {
     for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       TLogBatch logBatch = new TLogBatch();
       logBatch.setSearchIndex(i);
-      PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch));
+      PendingBatch batch = new PendingBatch(MultiLeaderConfig.newBuilder().build());
+      batch.addTLogBatch(logBatch);
+      batch.buildIndex();
       batchList.add(batch);
       status.addNextBatch(batch);
     }
@@ -176,7 +181,9 @@ public class SyncStatusTest {
     for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       TLogBatch logBatch = new TLogBatch();
       logBatch.setSearchIndex(i);
-      PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch));
+      PendingBatch batch = new PendingBatch(MultiLeaderConfig.newBuilder().build());
+      batch.addTLogBatch(logBatch);
+      batch.buildIndex();
       batchList.add(batch);
       status.addNextBatch(batch);
     }
@@ -195,7 +202,9 @@ public class SyncStatusTest {
             () -> {
               TLogBatch logBatch = new TLogBatch();
               logBatch.setSearchIndex(config.getReplication().getMaxPendingBatch());
-              PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch));
+              PendingBatch batch = new PendingBatch(MultiLeaderConfig.newBuilder().build());
+              batch.addTLogBatch(logBatch);
+              batch.buildIndex();
               batchList.add(batch);
               try {
                 status.addNextBatch(batch);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index be8e90b914..6ce2eeff80 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -92,7 +92,7 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
   List<InsertTabletNode> insertTabletNodeList;
 
   /** record the result of insert tablets */
-  private Map<Integer, TSStatus> results = new HashMap<>();
+  private final Map<Integer, TSStatus> results = new HashMap<>();
 
   public InsertMultiTabletsNode(PlanNodeId id) {
     super(id);