You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/11/09 13:20:07 UTC

[iotdb] branch multi_leader_memory_pendingBatch_control created (now e6aa8dea1f)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch multi_leader_memory_pendingBatch_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at e6aa8dea1f Control the size of each pending batch to be no larger than maxSizePerBatch

This branch includes the following new commits:

     new e6aa8dea1f Control the size of each pending batch to be no larger than maxSizePerBatch

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Control the size of each pending batch to be no larger than maxSizePerBatch

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch multi_leader_memory_pendingBatch_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e6aa8dea1f123a222cc6218b53d9a66c9adccb77
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Wed Nov 9 21:19:35 2022 +0800

    Control the size of each pending batch to be no larger than maxSizePerBatch
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../common/request/IndexedConsensusRequest.java    | 26 ++++-----
 .../iotdb/consensus/config/MultiLeaderConfig.java  | 38 +++++++++---
 .../multileader/MultiLeaderServerImpl.java         |  2 +-
 .../multileader/client/DispatchLogHandler.java     |  2 +-
 .../logdispatcher/AccumulatingBatch.java           | 67 ++++++++++++++++++++++
 .../multileader/logdispatcher/LogDispatcher.java   | 59 ++++++++++---------
 .../multileader/logdispatcher/PendingBatch.java    | 23 ++++----
 .../multileader/logdispatcher/SyncStatus.java      |  4 +-
 .../multileader/logdispatcher/SyncStatusTest.java  | 15 +++--
 .../plan/node/write/InsertMultiTabletsNode.java    |  2 +-
 10 files changed, 167 insertions(+), 71 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 6abca549b6..ca236da1c3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.consensus.common.request;
 
 import java.nio.ByteBuffer;
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
@@ -31,8 +31,8 @@ public class IndexedConsensusRequest implements IConsensusRequest {
   private final long searchIndex;
 
   private final long syncIndex;
-  private List<IConsensusRequest> requests;
-  private List<ByteBuffer> serializedRequests;
+  private final List<IConsensusRequest> requests;
+  private final List<ByteBuffer> serializedRequests = new ArrayList<>();
   private long serializedSize = 0;
 
   public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
@@ -41,15 +41,6 @@ public class IndexedConsensusRequest implements IConsensusRequest {
     this.syncIndex = -1L;
   }
 
-  public IndexedConsensusRequest(List<ByteBuffer> serializedRequests, long searchIndex) {
-    this.searchIndex = searchIndex;
-    this.serializedRequests = serializedRequests;
-    for (ByteBuffer byteBuffer : serializedRequests) {
-      serializedSize += byteBuffer.capacity();
-    }
-    this.syncIndex = -1L;
-  }
-
   public IndexedConsensusRequest(
       long searchIndex, long syncIndex, List<IConsensusRequest> requests) {
     this.searchIndex = searchIndex;
@@ -70,10 +61,13 @@ public class IndexedConsensusRequest implements IConsensusRequest {
     return serializedRequests;
   }
 
-  public List<ByteBuffer> buildSerializedRequests() {
-    List<ByteBuffer> result = new LinkedList<>();
-    this.requests.forEach(r -> result.add(r.serializeToByteBuffer()));
-    return result;
+  public void buildSerializedRequests() {
+    this.requests.forEach(
+        r -> {
+          ByteBuffer buffer = r.serializeToByteBuffer();
+          this.serializedRequests.add(buffer);
+          this.serializedSize += buffer.capacity();
+        });
   }
 
   public long getSerializedSize() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index cd0d58c4ae..8c7abec0ce 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -196,7 +196,9 @@ public class MultiLeaderConfig {
   }
 
   public static class Replication {
-    private final int maxRequestPerBatch;
+    private final int maxRequestNumPerBatch;
+
+    private final int maxSizePerBatch;
     private final int maxPendingBatch;
     private final int maxWaitingTimeForAccumulatingBatchInMs;
     private final long basicRetryWaitTimeMs;
@@ -207,7 +209,8 @@ public class MultiLeaderConfig {
     private final Long allocateMemoryForConsensus;
 
     private Replication(
-        int maxRequestPerBatch,
+        int maxRequestNumPerBatch,
+        int maxSizePerBatch,
         int maxPendingBatch,
         int maxWaitingTimeForAccumulatingBatchInMs,
         long basicRetryWaitTimeMs,
@@ -216,7 +219,8 @@ public class MultiLeaderConfig {
         long throttleTimeOutMs,
         long checkpointGap,
         long allocateMemoryForConsensus) {
-      this.maxRequestPerBatch = maxRequestPerBatch;
+      this.maxRequestNumPerBatch = maxRequestNumPerBatch;
+      this.maxSizePerBatch = maxSizePerBatch;
       this.maxPendingBatch = maxPendingBatch;
       this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs;
       this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
@@ -227,8 +231,12 @@ public class MultiLeaderConfig {
       this.allocateMemoryForConsensus = allocateMemoryForConsensus;
     }
 
-    public int getMaxRequestPerBatch() {
-      return maxRequestPerBatch;
+    public int getMaxRequestNumPerBatch() {
+      return maxRequestNumPerBatch;
+    }
+
+    public int getMaxSizePerBatch() {
+      return maxSizePerBatch;
     }
 
     public int getMaxPendingBatch() {
@@ -268,7 +276,8 @@ public class MultiLeaderConfig {
     }
 
     public static class Builder {
-      private int maxRequestPerBatch = 30;
+      private int maxRequestNumPerBatch = 30;
+      private int maxSizePerBatch = 4 * 1024 * 1024;
       // (IMPORTANT) Value of this variable should be the same with MAX_REQUEST_CACHE_SIZE
       // in DataRegionStateMachine
       private int maxPendingBatch = 5;
@@ -280,8 +289,13 @@ public class MultiLeaderConfig {
       private long checkpointGap = 500;
       private long allocateMemoryForConsensus;
 
-      public Replication.Builder setMaxRequestPerBatch(int maxRequestPerBatch) {
-        this.maxRequestPerBatch = maxRequestPerBatch;
+      public Replication.Builder setMaxRequestNumPerBatch(int maxRequestNumPerBatch) {
+        this.maxRequestNumPerBatch = maxRequestNumPerBatch;
+        return this;
+      }
+
+      public Builder setMaxSizePerBatch(int maxSizePerBatch) {
+        this.maxSizePerBatch = maxSizePerBatch;
         return this;
       }
 
@@ -316,6 +330,11 @@ public class MultiLeaderConfig {
         return this;
       }
 
+      public Builder setCheckpointGap(long checkpointGap) {
+        this.checkpointGap = checkpointGap;
+        return this;
+      }
+
       public Replication.Builder setAllocateMemoryForConsensus(long allocateMemoryForConsensus) {
         this.allocateMemoryForConsensus = allocateMemoryForConsensus;
         return this;
@@ -323,7 +342,8 @@ public class MultiLeaderConfig {
 
       public Replication build() {
         return new Replication(
-            maxRequestPerBatch,
+            maxRequestNumPerBatch,
+            maxSizePerBatch,
             maxPendingBatch,
             maxWaitingTimeForAccumulatingBatchInMs,
             basicRetryWaitTimeMs,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index e965973459..46f22472aa 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -99,7 +99,7 @@ public class MultiLeaderServerImpl {
   private final LogDispatcher logDispatcher;
   private final MultiLeaderConfig config;
   private final ConsensusReqReader reader;
-  private boolean active;
+  private volatile boolean active;
   private String latestSnapshotId;
   private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager;
   private final MultiLeaderServerMetrics metrics;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
index 705aa980e9..5d733f9cc3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
@@ -74,7 +74,7 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogRes> {
             "syncLogTimePerRequest",
             Tag.REGION.toString(),
             this.thread.getPeer().getGroupId().toString())
-        .update((System.currentTimeMillis() - createTime) / batch.getBatches().size());
+        .update((System.currentTimeMillis() - createTime) / batch.getBatch().getBatchList().size());
   }
 
   private boolean needRetry(int statusCode) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/AccumulatingBatch.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/AccumulatingBatch.java
new file mode 100644
index 0000000000..afc0537512
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/AccumulatingBatch.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.logdispatcher;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class AccumulatingBatch {
+
+  private final MultiLeaderConfig config;
+  private final List<TLogBatch> batchList;
+  private long serializedSize;
+
+  public AccumulatingBatch(MultiLeaderConfig config) {
+    this.config = config;
+    this.batchList = new ArrayList<>();
+    this.serializedSize = 0;
+  }
+
+  @TestOnly
+  public AccumulatingBatch(List<TLogBatch> batches) {
+    this.config = MultiLeaderConfig.newBuilder().build();
+    this.batchList = batches;
+    this.serializedSize =
+        batches.stream().mapToLong(x -> x.getData() == null ? 0 : x.getData().length).sum();
+  }
+
+  public void addTLogBatch(TLogBatch batch) {
+    batchList.add(batch);
+    // TODO Maybe we need to add in additional fields for more accurate calculations
+    serializedSize += batch.getData().length;
+  }
+
+  public boolean canAccumulate() {
+    return batchList.size() < config.getReplication().getMaxRequestNumPerBatch()
+        && serializedSize < config.getReplication().getMaxSizePerBatch();
+  }
+
+  public List<TLogBatch> getBatchList() {
+    return batchList;
+  }
+
+  public long getSerializedSize() {
+    return serializedSize;
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 606731e6a7..198284dc78 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -59,6 +58,7 @@ import java.util.stream.Collectors;
 
 /** Manage all asynchronous replication threads and corresponding async clients */
 public class LogDispatcher {
+
   private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
   private static final long DEFAULT_INITIAL_SYNC_INDEX = 0L;
   private final MultiLeaderServerImpl impl;
@@ -156,7 +156,7 @@ public class LogDispatcher {
   }
 
   public void offer(IndexedConsensusRequest request) {
-    List<ByteBuffer> serializedRequests = request.buildSerializedRequests();
+    request.buildSerializedRequests();
     // we put the serialization step outside the synchronized block because it is stateless and
     // time-consuming
     synchronized (this) {
@@ -167,8 +167,7 @@ public class LogDispatcher {
                 impl.getThisNode().getGroupId(),
                 thread.getPeer().getEndpoint().getIp(),
                 thread.getPendingRequestSize());
-            if (!thread.offer(
-                new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
+            if (!thread.offer(request)) {
               logger.debug(
                   "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
                   impl.getThisNode().getGroupId(),
@@ -180,6 +179,7 @@ public class LogDispatcher {
   }
 
   public class LogDispatcherThread implements Runnable {
+
     private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10;
     private static final long START_INDEX = 1;
     private final MultiLeaderConfig config;
@@ -198,7 +198,7 @@ public class LogDispatcher {
         MultiLeaderMemoryManager.getInstance();
     private volatile boolean stopped = false;
 
-    private ConsensusReqReader.ReqIterator walEntryIterator;
+    private final ConsensusReqReader.ReqIterator walEntryIterator;
 
     private final LogDispatcherThreadMetrics metrics;
 
@@ -300,7 +300,7 @@ public class LogDispatcher {
             if (request != null) {
               bufferedRequest.add(request);
               // If write pressure is low, we simply sleep a little to reduce the number of RPC
-              if (pendingRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
+              if (pendingRequest.size() <= config.getReplication().getMaxRequestNumPerBatch()) {
                 Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
               }
             }
@@ -315,7 +315,9 @@ public class LogDispatcher {
                   "constructBatch",
                   Tag.REGION.toString(),
                   peer.getGroupId().toString())
-              .update((System.currentTimeMillis() - startTime) / batch.getBatches().size());
+              .update(
+                  (System.currentTimeMillis() - startTime)
+                      / batch.getBatch().getBatchList().size());
           // we may block here if the synchronization pipeline is full
           syncStatus.addNextBatch(batch);
           // sends batch asynchronously and migrates the retry logic into the callback handler
@@ -342,12 +344,10 @@ public class LogDispatcher {
     }
 
     public PendingBatch getBatch() {
-      PendingBatch batch;
-      List<TLogBatch> logBatches = new ArrayList<>();
       long startIndex = syncStatus.getNextSendingIndex();
       long maxIndexWhenBufferedRequestEmpty = startIndex;
       logger.debug("[GetBatch] startIndex: {}", startIndex);
-      if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
+      if (bufferedRequest.size() <= config.getReplication().getMaxRequestNumPerBatch()) {
         // Use drainTo instead of poll to reduce lock overhead
         logger.debug(
             "{} : pendingRequest Size: {}, bufferedRequest size: {}",
@@ -357,7 +357,7 @@ public class LogDispatcher {
         synchronized (impl.getIndexObject()) {
           pendingRequest.drainTo(
               bufferedRequest,
-              config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
+              config.getReplication().getMaxRequestNumPerBatch() - bufferedRequest.size());
           maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1;
         }
         // remove all request that searchIndex < startIndex
@@ -372,6 +372,8 @@ public class LogDispatcher {
           }
         }
       }
+      PendingBatch batch;
+      AccumulatingBatch logBatches = new AccumulatingBatch(config);
       // This condition will be executed in several scenarios:
       // 1. restart
       // 2. The getBatch() is invoked immediately at the moment the PendingRequests are consumed
@@ -386,25 +388,32 @@ public class LogDispatcher {
         // Notice that prev searchIndex >= startIndex
         Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
         IndexedConsensusRequest prev = iterator.next();
+
         // Prevents gap between logs. For example, some requests are not written into the queue when
         // the queue is full. In this case, requests need to be loaded from the WAL
         constructBatchFromWAL(startIndex, prev.getSearchIndex(), logBatches);
-        if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
+        if (!logBatches.canAccumulate()) {
           batch = new PendingBatch(logBatches);
           logger.debug("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batch);
           return batch;
         }
+
         constructBatchIndexedFromConsensusRequest(prev, logBatches);
         iterator.remove();
         releaseReservedMemory(prev);
-        while (iterator.hasNext()
-            && logBatches.size() <= config.getReplication().getMaxRequestPerBatch()) {
+        if (!logBatches.canAccumulate()) {
+          batch = new PendingBatch(logBatches);
+          logger.debug("{} : accumulated a {} from queue", impl.getThisNode().getGroupId(), batch);
+          return batch;
+        }
+
+        while (iterator.hasNext() && logBatches.canAccumulate()) {
           IndexedConsensusRequest current = iterator.next();
           // Prevents gap between logs. For example, some logs are not written into the queue when
           // the queue is full. In this case, requests need to be loaded from the WAL
           if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
             constructBatchFromWAL(prev.getSearchIndex(), current.getSearchIndex(), logBatches);
-            if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
+            if (!logBatches.canAccumulate()) {
               batch = new PendingBatch(logBatches);
               logger.debug(
                   "gap {} : accumulated a {} from queue and wal when gap",
@@ -433,7 +442,9 @@ public class LogDispatcher {
         AsyncMultiLeaderServiceClient client = clientManager.borrowClient(peer.getEndpoint());
         TSyncLogReq req =
             new TSyncLogReq(
-                selfPeerId, peer.getGroupId().convertToTConsensusGroupId(), batch.getBatches());
+                selfPeerId,
+                peer.getGroupId().convertToTConsensusGroupId(),
+                batch.getBatch().getBatchList());
         logger.debug(
             "Send Batch[startIndex:{}, endIndex:{}] to ConsensusGroup:{}",
             batch.getStartIndex(),
@@ -450,8 +461,8 @@ public class LogDispatcher {
       return syncStatus;
     }
 
-    private long constructBatchFromWAL(
-        long currentIndex, long maxIndex, List<TLogBatch> logBatches) {
+    private void constructBatchFromWAL(
+        long currentIndex, long maxIndex, AccumulatingBatch logBatches) {
       logger.debug(
           String.format(
               "DataRegion[%s]->%s: currentIndex: %d, maxIndex: %d",
@@ -460,8 +471,7 @@ public class LogDispatcher {
       long targetIndex = currentIndex;
       // Even if there is no WAL files, these code won't produce error.
       walEntryIterator.skipTo(targetIndex);
-      while (targetIndex < maxIndex
-          && logBatches.size() < config.getReplication().getMaxRequestPerBatch()) {
+      while (targetIndex < maxIndex && logBatches.canAccumulate()) {
         logger.debug("construct from WAL for one Entry, index : {}", targetIndex);
         try {
           walEntryIterator.waitForNextReady();
@@ -490,19 +500,16 @@ public class LogDispatcher {
         targetIndex = data.getSearchIndex() + 1;
         // construct request from wal
         for (IConsensusRequest innerRequest : data.getRequests()) {
-          logBatches.add(
+          logBatches.addTLogBatch(
               new TLogBatch(innerRequest.serializeToByteBuffer(), data.getSearchIndex(), true));
         }
       }
-      return logBatches.size() > 0
-          ? logBatches.get(logBatches.size() - 1).searchIndex
-          : currentIndex;
     }
 
     private void constructBatchIndexedFromConsensusRequest(
-        IndexedConsensusRequest request, List<TLogBatch> logBatches) {
+        IndexedConsensusRequest request, AccumulatingBatch logBatches) {
       for (ByteBuffer innerRequest : request.getSerializedRequests()) {
-        logBatches.add(new TLogBatch(innerRequest, request.getSearchIndex(), false));
+        logBatches.addTLogBatch(new TLogBatch(innerRequest, request.getSearchIndex(), false));
       }
     }
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
index a6ee43392f..71a7f72059 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
@@ -27,19 +27,20 @@ public class PendingBatch {
 
   private final long startIndex;
   private final long endIndex;
-  private final List<TLogBatch> batches;
+  private final AccumulatingBatch batch;
   // indicates whether this batch has been successfully synchronized to another node
   private boolean synced;
 
-  public PendingBatch(List<TLogBatch> batches) {
-    if (!batches.isEmpty()) {
-      this.startIndex = batches.get(0).getSearchIndex();
-      this.endIndex = batches.get(batches.size() - 1).getSearchIndex();
+  public PendingBatch(AccumulatingBatch batch) {
+    List<TLogBatch> batchList = batch.getBatchList();
+    if (!batchList.isEmpty()) {
+      this.startIndex = batchList.get(0).getSearchIndex();
+      this.endIndex = batchList.get(batchList.size() - 1).getSearchIndex();
     } else {
       this.startIndex = 0;
       this.endIndex = 0;
     }
-    this.batches = batches;
+    this.batch = batch;
     this.synced = false;
   }
 
@@ -51,8 +52,8 @@ public class PendingBatch {
     return endIndex;
   }
 
-  public List<TLogBatch> getBatches() {
-    return batches;
+  public AccumulatingBatch getBatch() {
+    return batch;
   }
 
   public boolean isSynced() {
@@ -64,7 +65,7 @@ public class PendingBatch {
   }
 
   public boolean isEmpty() {
-    return batches.isEmpty();
+    return batch.getBatchList().isEmpty();
   }
 
   @Override
@@ -75,7 +76,9 @@ public class PendingBatch {
         + ", endIndex="
         + endIndex
         + ", size="
-        + batches.size()
+        + batch.getBatchList().size()
+        + ", serializedSize="
+        + batch.getSerializedSize()
         + '}';
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
index e9901d931a..3549c1158b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
@@ -30,7 +30,7 @@ public class SyncStatus {
 
   private final MultiLeaderConfig config;
   private final IndexController controller;
-  private final List<PendingBatch> pendingBatches = new LinkedList<>();
+  private final LinkedList<PendingBatch> pendingBatches = new LinkedList<>();
 
   public SyncStatus(IndexController controller, MultiLeaderConfig config) {
     this.controller = controller;
@@ -80,7 +80,7 @@ public class SyncStatus {
       return 1
           + (pendingBatches.isEmpty()
               ? controller.getCurrentIndex()
-              : pendingBatches.get(pendingBatches.size() - 1).getEndIndex());
+              : pendingBatches.getLast().getEndIndex());
     }
   }
 
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
index c4363428fb..d9b591d1b6 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
@@ -66,7 +66,8 @@ public class SyncStatusTest {
     for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       TLogBatch logBatch = new TLogBatch();
       logBatch.setSearchIndex(i);
-      PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch));
+      PendingBatch batch =
+          new PendingBatch(new AccumulatingBatch(Collections.singletonList(logBatch)));
       batchList.add(batch);
       status.addNextBatch(batch);
     }
@@ -95,7 +96,8 @@ public class SyncStatusTest {
     for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       TLogBatch logBatch = new TLogBatch();
       logBatch.setSearchIndex(i);
-      PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch));
+      PendingBatch batch =
+          new PendingBatch(new AccumulatingBatch(Collections.singletonList(logBatch)));
       batchList.add(batch);
       status.addNextBatch(batch);
     }
@@ -130,7 +132,8 @@ public class SyncStatusTest {
     for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       TLogBatch logBatch = new TLogBatch();
       logBatch.setSearchIndex(i);
-      PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch));
+      PendingBatch batch =
+          new PendingBatch(new AccumulatingBatch(Collections.singletonList(logBatch)));
       batchList.add(batch);
       status.addNextBatch(batch);
     }
@@ -176,7 +179,8 @@ public class SyncStatusTest {
     for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       TLogBatch logBatch = new TLogBatch();
       logBatch.setSearchIndex(i);
-      PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch));
+      PendingBatch batch =
+          new PendingBatch(new AccumulatingBatch(Collections.singletonList(logBatch)));
       batchList.add(batch);
       status.addNextBatch(batch);
     }
@@ -195,7 +199,8 @@ public class SyncStatusTest {
             () -> {
               TLogBatch logBatch = new TLogBatch();
               logBatch.setSearchIndex(config.getReplication().getMaxPendingBatch());
-              PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch));
+              PendingBatch batch =
+                  new PendingBatch(new AccumulatingBatch(Collections.singletonList(logBatch)));
               batchList.add(batch);
               try {
                 status.addNextBatch(batch);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index be8e90b914..6ce2eeff80 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -92,7 +92,7 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
   List<InsertTabletNode> insertTabletNodeList;
 
   /** record the result of insert tablets */
-  private Map<Integer, TSStatus> results = new HashMap<>();
+  private final Map<Integer, TSStatus> results = new HashMap<>();
 
   public InsertMultiTabletsNode(PlanNodeId id) {
     super(id);