You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/08/11 04:08:50 UTC

[iotdb] branch master updated: Optimize MultileaderConsensus SyncLog (#6936)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3033146fa0 Optimize MultileaderConsensus SyncLog (#6936)
3033146fa0 is described below

commit 3033146fa01ce158efadf825787bc6b8d569892c
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Thu Aug 11 12:08:45 2022 +0800

    Optimize MultileaderConsensus SyncLog (#6936)
    
    Co-authored-by: ZhangHongYin <46...@users.noreply.github.com>
---
 ...uest.java => BatchIndexedConsensusRequest.java} |  53 ++++---
 .../common/request/IndexedConsensusRequest.java    |  33 ++++-
 .../iotdb/consensus/config/MultiLeaderConfig.java  |   8 +-
 .../multileader/MultiLeaderServerImpl.java         |  24 +++-
 .../multileader/logdispatcher/LogDispatcher.java   |  39 +++--
 .../service/MultiLeaderRPCServiceProcessor.java    |  52 +++----
 .../multileader/logdispatcher/SyncStatusTest.java  |   4 +-
 .../multileader/util/TestStateMachine.java         |  40 ++++--
 .../org/apache/iotdb/commons}/StepTracker.java     |  29 +++-
 .../statemachine/DataRegionStateMachine.java       | 159 +++++++++++++++++++--
 .../java/org/apache/iotdb/db/wal/node/WALNode.java |  10 +-
 11 files changed, 350 insertions(+), 101 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
similarity index 50%
copy from consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
copy to consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
index de3aca433b..a4a38fc84f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
@@ -20,49 +20,44 @@
 package org.apache.iotdb.consensus.common.request;
 
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.Objects;
 
-/** only used for multi-leader consensus. */
-public class IndexedConsensusRequest implements IConsensusRequest {
+public class BatchIndexedConsensusRequest implements IConsensusRequest {
 
-  /** we do not need to serialize these two fields as they are useless in other nodes. */
-  private final long searchIndex;
+  private boolean isStartSyncIndexInitialized;
+  private long startSyncIndex;
+  private long endSyncIndex;
+  private final List<IndexedConsensusRequest> requests;
 
-  private final List<IConsensusRequest> requests;
-
-  public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
-    this.searchIndex = searchIndex;
-    this.requests = requests;
+  public BatchIndexedConsensusRequest() {
+    this.requests = new LinkedList<>();
+    this.isStartSyncIndexInitialized = false;
   }
 
-  @Override
-  public ByteBuffer serializeToByteBuffer() {
-    throw new UnsupportedOperationException();
+  public void add(IndexedConsensusRequest request) {
+    if (!isStartSyncIndexInitialized) {
+      startSyncIndex = request.getSyncIndex();
+      isStartSyncIndexInitialized = true;
+    }
+    endSyncIndex = request.getSyncIndex();
+    this.requests.add(request);
   }
 
-  public List<IConsensusRequest> getRequests() {
-    return requests;
+  public long getStartSyncIndex() {
+    return startSyncIndex;
   }
 
-  public long getSearchIndex() {
-    return searchIndex;
+  public long getEndSyncIndex() {
+    return endSyncIndex;
   }
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    IndexedConsensusRequest that = (IndexedConsensusRequest) o;
-    return searchIndex == that.searchIndex && requests.equals(that.requests);
+  public List<IndexedConsensusRequest> getRequests() {
+    return requests;
   }
 
   @Override
-  public int hashCode() {
-    return Objects.hash(searchIndex, requests);
+  public ByteBuffer serializeToByteBuffer() {
+    return null;
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index de3aca433b..1a61cf3a09 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.consensus.common.request;
 
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 
@@ -29,11 +30,27 @@ public class IndexedConsensusRequest implements IConsensusRequest {
   /** we do not need to serialize these two fields as they are useless in other nodes. */
   private final long searchIndex;
 
-  private final List<IConsensusRequest> requests;
+  private final long syncIndex;
+  private List<IConsensusRequest> requests;
+  private List<ByteBuffer> serializedRequests;
 
   public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
     this.searchIndex = searchIndex;
     this.requests = requests;
+    this.syncIndex = -1L;
+  }
+
+  public IndexedConsensusRequest(List<ByteBuffer> serializedRequests, long searchIndex) {
+    this.searchIndex = searchIndex;
+    this.serializedRequests = serializedRequests;
+    this.syncIndex = -1L;
+  }
+
+  public IndexedConsensusRequest(
+      long searchIndex, long syncIndex, List<IConsensusRequest> requests) {
+    this.searchIndex = searchIndex;
+    this.requests = requests;
+    this.syncIndex = syncIndex;
   }
 
   @Override
@@ -45,10 +62,24 @@ public class IndexedConsensusRequest implements IConsensusRequest {
     return requests;
   }
 
+  public List<ByteBuffer> getSerializedRequests() {
+    return serializedRequests;
+  }
+
+  public List<ByteBuffer> buildSerializedRequests() {
+    List<ByteBuffer> result = new LinkedList<>();
+    this.requests.forEach(r -> result.add(r.serializeToByteBuffer()));
+    return result;
+  }
+
   public long getSearchIndex() {
     return searchIndex;
   }
 
+  public long getSyncIndex() {
+    return syncIndex;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index e4f66f557b..9b4a72a4f1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -247,9 +247,11 @@ public class MultiLeaderConfig {
     }
 
     public static class Builder {
-      private int maxPendingRequestNumPerNode = 200;
-      private int maxRequestPerBatch = 40;
-      private int maxPendingBatch = 1;
+      private int maxPendingRequestNumPerNode = 600;
+      private int maxRequestPerBatch = 30;
+      // (IMPORTANT) Value of this variable should be the same with MAX_REQUEST_CACHE_SIZE
+      // in DataRegionStateMachine
+      private int maxPendingBatch = 5;
       private int maxWaitingTimeForAccumulatingBatchInMs = 500;
       private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
       private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 3e8cbbfb3b..6ef1fab4ae 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -122,16 +122,23 @@ public class MultiLeaderServerImpl {
       // TODO wal and memtable
       TSStatus result = stateMachine.write(indexedConsensusRequest);
       if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        logDispatcher.offer(indexedConsensusRequest);
+        // The index is used when constructing batch in LogDispatcher. If its value
+        // increases but the corresponding request does not exist or is not put into
+        // the queue, the dispatcher will try to find the request in WAL. This behavior
+        // is not expected and will slow down the preparation speed for batch.
+        // So we need to use the lock to ensure the `offer()` and `incrementAndGet()` are
+        // in one transaction.
+        synchronized (index) {
+          logDispatcher.offer(indexedConsensusRequest);
+          index.incrementAndGet();
+        }
       } else {
         logger.debug(
             "{}: write operation failed. searchIndex: {}. Code: {}",
             thisNode.getGroupId(),
             indexedConsensusRequest.getSearchIndex(),
             result.getCode());
-        index.decrementAndGet();
       }
-
       return result;
     }
   }
@@ -182,12 +189,13 @@ public class MultiLeaderServerImpl {
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
       IConsensusRequest request) {
-    return new IndexedConsensusRequest(index.incrementAndGet(), Collections.singletonList(request));
+    return new IndexedConsensusRequest(index.get() + 1, Collections.singletonList(request));
   }
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
-      List<IConsensusRequest> requests) {
-    return new IndexedConsensusRequest(ConsensusReqReader.DEFAULT_SEARCH_INDEX, requests);
+      long syncIndex, List<IConsensusRequest> requests) {
+    return new IndexedConsensusRequest(
+        ConsensusReqReader.DEFAULT_SEARCH_INDEX, syncIndex, requests);
   }
 
   /**
@@ -217,4 +225,8 @@ public class MultiLeaderServerImpl {
   public MultiLeaderConfig getConfig() {
     return config;
   }
+
+  public AtomicLong getIndexObject() {
+    return index;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index ed41ec5e16..c918dac459 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -106,17 +107,22 @@ public class LogDispatcher {
   }
 
   public void offer(IndexedConsensusRequest request) {
+    List<ByteBuffer> serializedRequests = request.buildSerializedRequests();
     threads.forEach(
         thread -> {
           logger.debug(
-              "{}: Push a log to the queue, where the queue length is {}",
+              "{}->{}: Push a log to the queue, where the queue length is {}",
               impl.getThisNode().getGroupId(),
+              thread.getPeer().getEndpoint().getIp(),
               thread.getPendingRequest().size());
-          if (!thread.getPendingRequest().offer(request)) {
+          if (!thread
+              .getPendingRequest()
+              .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
             logger.debug(
-                "{}: Log queue of {} is full, ignore the log to this node",
+                "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
                 impl.getThisNode().getGroupId(),
-                thread.getPeer());
+                thread.getPeer(),
+                request.getSearchIndex());
           }
         });
   }
@@ -222,6 +228,7 @@ public class LogDispatcher {
       PendingBatch batch;
       List<TLogBatch> logBatches = new ArrayList<>();
       long startIndex = syncStatus.getNextSendingIndex();
+      long maxIndexWhenBufferedRequestEmpty = startIndex;
       logger.debug("[GetBatch] startIndex: {}", startIndex);
       long endIndex;
       if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
@@ -231,9 +238,12 @@ public class LogDispatcher {
             impl.getThisNode().getGroupId(),
             pendingRequest.size(),
             bufferedRequest.size());
-        pendingRequest.drainTo(
-            bufferedRequest,
-            config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
+        synchronized (impl.getIndexObject()) {
+          pendingRequest.drainTo(
+              bufferedRequest,
+              config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
+          maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1;
+        }
         // remove all request that searchIndex < startIndex
         Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
         while (iterator.hasNext()) {
@@ -245,8 +255,13 @@ public class LogDispatcher {
           }
         }
       }
-      if (bufferedRequest.isEmpty()) { // only execute this after a restart
-        endIndex = constructBatchFromWAL(startIndex, impl.getIndex() + 1, logBatches);
+      // This condition will be executed in several scenarios:
+      // 1. restart
+      // 2. The getBatch() is invoked immediately at the moment the PendingRequests are consumed
+      // up. To prevent inconsistency here, we use the synchronized logic when calculate value of
+      // `maxIndexWhenBufferedRequestEmpty`
+      if (bufferedRequest.isEmpty()) {
+        endIndex = constructBatchFromWAL(startIndex, maxIndexWhenBufferedRequestEmpty, logBatches);
         batch = new PendingBatch(startIndex, endIndex, logBatches);
         logger.debug(
             "{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batch);
@@ -327,6 +342,7 @@ public class LogDispatcher {
               currentIndex,
               maxIndex,
               iteratorIndex));
+      // Even if there is no WAL files, these code won't produce error.
       if (iteratorIndex != currentIndex) {
         walEntryiterator.skipTo(currentIndex);
         iteratorIndex = currentIndex;
@@ -355,9 +371,8 @@ public class LogDispatcher {
 
     private void constructBatchIndexedFromConsensusRequest(
         IndexedConsensusRequest request, List<TLogBatch> logBatches) {
-      for (IConsensusRequest innerRequest : request.getRequests()) {
-        logBatches.add(
-            new TLogBatch(innerRequest.serializeToByteBuffer(), request.getSearchIndex(), false));
+      for (ByteBuffer innerRequest : request.getSerializedRequests()) {
+        logBatches.add(new TLogBatch(innerRequest, request.getSearchIndex(), false));
       }
     }
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 3fc63c7d99..70a0387c40 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.consensus.multileader.service;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
@@ -67,36 +68,37 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
         resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
         return;
       }
-      List<TSStatus> statuses = new ArrayList<>();
+      BatchIndexedConsensusRequest requestsInThisBatch = new BatchIndexedConsensusRequest();
       // We use synchronized to ensure atomicity of executing multiple logs
       if (!req.getBatches().isEmpty()) {
-        synchronized (impl.getStateMachine()) {
-          List<IConsensusRequest> consensusRequests = new ArrayList<>();
-          long currentSearchIndex = req.getBatches().get(0).getSearchIndex();
-          for (TLogBatch batch : req.getBatches()) {
-            IConsensusRequest request =
-                batch.isFromWAL()
-                    ? new MultiLeaderConsensusRequest(batch.data)
-                    : new ByteBufferConsensusRequest(batch.data);
-            // merge TLogBatch with same search index into one request
-            if (batch.getSearchIndex() != currentSearchIndex) {
-              statuses.add(
-                  impl.getStateMachine()
-                      .write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
-              consensusRequests = new ArrayList<>();
-            }
-            consensusRequests.add(request);
-          }
-          // write last request
-          if (!consensusRequests.isEmpty()) {
-            statuses.add(
-                impl.getStateMachine()
-                    .write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
+        List<IConsensusRequest> consensusRequests = new ArrayList<>();
+        long currentSearchIndex = req.getBatches().get(0).getSearchIndex();
+        for (TLogBatch batch : req.getBatches()) {
+          IConsensusRequest request =
+              batch.isFromWAL()
+                  ? new MultiLeaderConsensusRequest(batch.data)
+                  : new ByteBufferConsensusRequest(batch.data);
+          // merge TLogBatch with same search index into one request
+          if (batch.getSearchIndex() != currentSearchIndex) {
+            requestsInThisBatch.add(
+                impl.buildIndexedConsensusRequestForRemoteRequest(
+                    currentSearchIndex, consensusRequests));
+            consensusRequests = new ArrayList<>();
+            currentSearchIndex = batch.getSearchIndex();
           }
+          consensusRequests.add(request);
+        }
+        // write last request
+        if (!consensusRequests.isEmpty()) {
+          requestsInThisBatch.add(
+              impl.buildIndexedConsensusRequestForRemoteRequest(
+                  currentSearchIndex, consensusRequests));
         }
       }
-      logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId, statuses);
-      resultHandler.onComplete(new TSyncLogRes(statuses));
+      TSStatus writeStatus = impl.getStateMachine().write(requestsInThisBatch);
+      logger.debug(
+          "Execute TSyncLogReq for {} with result {}", req.consensusGroupId, writeStatus.subStatus);
+      resultHandler.onComplete(new TSyncLogRes(writeStatus.subStatus));
     } catch (Exception e) {
       resultHandler.onError(e);
     }
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
index d243a80e77..db6b03377a 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
@@ -138,7 +138,9 @@ public class SyncStatusTest {
         i++) {
       status.removeBatch(batchList.get(i));
       Assert.assertEquals(
-          config.getReplication().getMaxPendingBatch() / 2, status.getPendingBatches().size());
+          config.getReplication().getMaxPendingBatch()
+              - config.getReplication().getMaxPendingBatch() / 2,
+          status.getPendingBatches().size());
       Assert.assertEquals(
           config.getReplication().getMaxPendingBatch(), status.getNextSendingIndex());
     }
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
index eab940cf87..648c6282dd 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
@@ -30,6 +31,9 @@ import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -40,6 +44,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
 
+  private static final Logger logger = LoggerFactory.getLogger(TestStateMachine.class);
   private final RequestSets requestSets = new RequestSets(ConcurrentHashMap.newKeySet());
 
   public Set<IndexedConsensusRequest> getRequestSet() {
@@ -68,19 +73,36 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
   @Override
   public TSStatus write(IConsensusRequest request) {
     synchronized (requestSets) {
-      IndexedConsensusRequest indexedConsensusRequest = (IndexedConsensusRequest) request;
-      List<IConsensusRequest> transformedRequest = new ArrayList<>();
-      for (IConsensusRequest innerRequest : indexedConsensusRequest.getRequests()) {
-        ByteBuffer buffer = innerRequest.serializeToByteBuffer();
-        transformedRequest.add(new TestEntry(buffer.getInt(), Peer.deserialize(buffer)));
+      if (request instanceof IndexedConsensusRequest) {
+        writeOneRequest((IndexedConsensusRequest) request);
+        return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+      } else if (request instanceof BatchIndexedConsensusRequest) {
+        BatchIndexedConsensusRequest batchIndexedConsensusRequest =
+            (BatchIndexedConsensusRequest) request;
+        List<TSStatus> subStatus = new ArrayList<>();
+        for (IndexedConsensusRequest innerRequest : batchIndexedConsensusRequest.getRequests()) {
+          writeOneRequest(innerRequest);
+          subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+        }
+        return new TSStatus().setSubStatus(subStatus);
+      } else {
+        logger.error("Unknown request: {}", request);
+        return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR);
       }
-      requestSets.add(
-          new IndexedConsensusRequest(indexedConsensusRequest.getSearchIndex(), transformedRequest),
-          indexedConsensusRequest.getSearchIndex() != ConsensusReqReader.DEFAULT_SEARCH_INDEX);
-      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     }
   }
 
+  private void writeOneRequest(IndexedConsensusRequest indexedConsensusRequest) {
+    List<IConsensusRequest> transformedRequest = new ArrayList<>();
+    for (IConsensusRequest innerRequest : indexedConsensusRequest.getRequests()) {
+      ByteBuffer buffer = innerRequest.serializeToByteBuffer();
+      transformedRequest.add(new TestEntry(buffer.getInt(), Peer.deserialize(buffer)));
+    }
+    requestSets.add(
+        new IndexedConsensusRequest(indexedConsensusRequest.getSearchIndex(), transformedRequest),
+        indexedConsensusRequest.getSearchIndex() != ConsensusReqReader.DEFAULT_SEARCH_INDEX);
+  }
+
   @Override
   public synchronized DataSet read(IConsensusRequest request) {
     if (request instanceof GetConsensusReqReaderPlan) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java b/node-commons/src/main/java/org/apache/iotdb/commons/StepTracker.java
similarity index 74%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/StepTracker.java
index b5e1fc88ad..98ca5a4aab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/StepTracker.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.commons;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,18 +27,24 @@ import java.util.Map;
 
 class Metric {
   private static final Logger logger = LoggerFactory.getLogger(Metric.class);
-  private static final int PRINT_RATE = 1000;
+  private static final int DEFAULT_PRINT_RATE = 1000;
 
   public String stepName;
   public long invokeCount;
   public long totalTime;
   public long lastCycleTime;
+  public int printRate;
 
   public Metric(String stepName) {
+    this(stepName, DEFAULT_PRINT_RATE);
+  }
+
+  public Metric(String stepName, int printRate) {
     this.stepName = stepName;
     this.invokeCount = 0;
     this.totalTime = 0;
     this.lastCycleTime = 0;
+    this.printRate = printRate;
   }
 
   public void trace(long startTime, long endTime) {
@@ -47,7 +53,7 @@ class Metric {
   }
 
   public void tryPrint() {
-    if (invokeCount % PRINT_RATE == 0) {
+    if (invokeCount % printRate == 0) {
       logger.info(
           String.format(
               "step metrics [%d]-[%s] - Total: %d, SUM: %.2fms, AVG: %fms, Last%dAVG: %fms",
@@ -56,8 +62,8 @@ class Metric {
               invokeCount,
               totalTime * 1.0 / 1000000,
               totalTime * 1.0 / 1000000 / invokeCount,
-              PRINT_RATE,
-              (totalTime * 1.0 - lastCycleTime) / 1000000 / PRINT_RATE));
+              printRate,
+              (totalTime * 1.0 - lastCycleTime) / 1000000 / printRate));
       lastCycleTime = totalTime;
     }
   }
@@ -74,7 +80,18 @@ public class StepTracker {
     metrics.get().get(stepName).tryPrint();
   }
 
+  public static void trace(String stepName, int printRate, long startTime, long endTime) {
+    if (metrics.get() == null) {
+      metrics.set(new HashMap<>());
+    }
+    metrics
+        .get()
+        .computeIfAbsent(stepName, key -> new Metric(stepName, printRate))
+        .trace(startTime, endTime);
+    metrics.get().get(stepName).tryPrint();
+  }
+
   public static void cleanup() {
-    metrics.set(null);
+    metrics.remove();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 818a09ab2e..fa7a4aa502 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
@@ -47,7 +48,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class DataRegionStateMachine extends BaseStateMachine {
 
@@ -58,8 +65,17 @@ public class DataRegionStateMachine extends BaseStateMachine {
 
   private DataRegion region;
 
+  private static final int MAX_REQUEST_CACHE_SIZE = 5;
+  private static final long CACHE_WINDOW_TIME_IN_MS = 10_000;
+
+  private final Lock queueLock = new ReentrantLock();
+  private final Condition queueSortCondition = queueLock.newCondition();
+  private final PriorityQueue<InsertNodeWrapper> requestCache;
+  private long nextSyncIndex = -1;
+
   public DataRegionStateMachine(DataRegion region) {
     this.region = region;
+    this.requestCache = new PriorityQueue<>();
   }
 
   @Override
@@ -104,6 +120,136 @@ public class DataRegionStateMachine extends BaseStateMachine {
     }
   }
 
+  /**
+   * This method is used for write of MultiLeader SyncLog. By this method, we can keep write order
+   * in follower the same as the leader. And besides order insurance, we can make the
+   * deserialization of PlanNode to be concurrent
+   */
+  private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper insertNodeWrapper) {
+    queueLock.lock();
+    try {
+      requestCache.add(insertNodeWrapper);
+      // If the peek is not hold by current thread, it should notify the corresponding thread to
+      // process the peek when the queue is full
+      if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
+          && requestCache.peek().getStartSyncIndex() != insertNodeWrapper.getStartSyncIndex()) {
+        queueSortCondition.signalAll();
+      }
+      while (true) {
+        // If current InsertNode is the next target InsertNode, write it
+        if (insertNodeWrapper.getStartSyncIndex() == nextSyncIndex) {
+          requestCache.remove(insertNodeWrapper);
+          nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
+          break;
+        }
+        // If all write thread doesn't hit nextSyncIndex and the heap is full, write
+        // the peek request. This is used to keep the whole write correct when nextSyncIndex
+        // is not set. We won't persist the value of nextSyncIndex to reduce the complexity.
+        // There are some cases that nextSyncIndex is not set:
+        //   1. When the system was just started
+        //   2. When some exception occurs during SyncLog
+        if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
+            && requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
+          requestCache.remove();
+          nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
+          break;
+        }
+        try {
+          boolean timeout =
+              !queueSortCondition.await(CACHE_WINDOW_TIME_IN_MS, TimeUnit.MILLISECONDS);
+          if (timeout) {
+            // although the timeout is triggered, current thread cannot write its request
+            // if current thread does not hold the peek request. And there should be some
+            // other thread who hold the peek request. In this scenario, current thread
+            // should go into await again and wait until its request becoming peek request
+            if (requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
+              // current thread hold the peek request thus it can write the peek immediately.
+              logger.info(
+                  "waiting target request timeout. current index: {}, target index: {}",
+                  insertNodeWrapper.getStartSyncIndex(),
+                  nextSyncIndex);
+              requestCache.remove(insertNodeWrapper);
+              break;
+            }
+          }
+        } catch (InterruptedException e) {
+          logger.warn(
+              "current waiting is interrupted. SyncIndex: {}. Exception: {}",
+              insertNodeWrapper.getStartSyncIndex(),
+              e);
+          Thread.currentThread().interrupt();
+        }
+      }
+      logger.debug(
+          "region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}",
+          region.getDataRegionId(),
+          requestCache.size(),
+          insertNodeWrapper.getStartSyncIndex(),
+          insertNodeWrapper.getEndSyncIndex());
+      List<TSStatus> subStatus = new LinkedList<>();
+      for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
+        subStatus.add(write(insertNode));
+      }
+      queueSortCondition.signalAll();
+      return new TSStatus().setSubStatus(subStatus);
+    } finally {
+      queueLock.unlock();
+    }
+  }
+
+  private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
+    private final long startSyncIndex;
+    private final long endSyncIndex;
+    private final List<InsertNode> insertNodes;
+
+    public InsertNodeWrapper(long startSyncIndex, long endSyncIndex) {
+      this.startSyncIndex = startSyncIndex;
+      this.endSyncIndex = endSyncIndex;
+      this.insertNodes = new LinkedList<>();
+    }
+
+    @Override
+    public int compareTo(InsertNodeWrapper o) {
+      return Long.compare(startSyncIndex, o.startSyncIndex);
+    }
+
+    public void add(InsertNode insertNode) {
+      this.insertNodes.add(insertNode);
+    }
+
+    public long getStartSyncIndex() {
+      return startSyncIndex;
+    }
+
+    public long getEndSyncIndex() {
+      return endSyncIndex;
+    }
+
+    public List<InsertNode> getInsertNodes() {
+      return insertNodes;
+    }
+  }
+
+  private InsertNodeWrapper deserializeAndWrap(BatchIndexedConsensusRequest batchRequest) {
+    InsertNodeWrapper insertNodeWrapper =
+        new InsertNodeWrapper(batchRequest.getStartSyncIndex(), batchRequest.getEndSyncIndex());
+    for (IndexedConsensusRequest indexedRequest : batchRequest.getRequests()) {
+      insertNodeWrapper.add(grabInsertNode(indexedRequest));
+    }
+    return insertNodeWrapper;
+  }
+
+  private InsertNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
+    List<InsertNode> insertNodes = new ArrayList<>(indexedRequest.getRequests().size());
+    for (IConsensusRequest req : indexedRequest.getRequests()) {
+      // PlanNode in IndexedConsensusRequest should always be InsertNode
+      InsertNode innerNode = (InsertNode) getPlanNode(req);
+      innerNode.setSearchIndex(indexedRequest.getSearchIndex());
+      insertNodes.add(innerNode);
+    }
+    return mergeInsertNodes(insertNodes);
+  }
+
   @Override
   public List<File> getSnapshotFiles(File latestSnapshotRootDir) {
     // TODO: implement this method
@@ -116,14 +262,11 @@ public class DataRegionStateMachine extends BaseStateMachine {
     try {
       if (request instanceof IndexedConsensusRequest) {
         IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest) request;
-        List<InsertNode> insertNodes = new ArrayList<>(indexedRequest.getRequests().size());
-        for (IConsensusRequest req : indexedRequest.getRequests()) {
-          // PlanNode in IndexedConsensusRequest should always be InsertNode
-          InsertNode innerNode = (InsertNode) getPlanNode(req);
-          innerNode.setSearchIndex(indexedRequest.getSearchIndex());
-          insertNodes.add(innerNode);
-        }
-        planNode = mergeInsertNodes(insertNodes);
+        planNode = grabInsertNode(indexedRequest);
+      } else if (request instanceof BatchIndexedConsensusRequest) {
+        InsertNodeWrapper insertNodeWrapper =
+            deserializeAndWrap((BatchIndexedConsensusRequest) request);
+        return cacheAndInsertLatestNode(insertNodeWrapper);
       } else {
         planNode = getPlanNode(request);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 1a0a74de22..bf1b1fcdaf 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -88,6 +88,9 @@ public class WALNode implements IWALNode {
   /** no multi-leader consensus, all insert nodes can be safely deleted */
   public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX = Long.MAX_VALUE;
 
+  /** timeout threshold when waiting for next wal entry */
+  private static final long WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC = 30;
+
   /** unique identifier of this WALNode */
   private final String identifier;
   /** directory to store this node's files */
@@ -680,7 +683,12 @@ public class WALNode implements IWALNode {
     @Override
     public void waitForNextReady() throws InterruptedException {
       while (!hasNext()) {
-        buffer.waitForFlush();
+        boolean timeout =
+            !buffer.waitForFlush(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, TimeUnit.SECONDS);
+        if (timeout) {
+          logger.info("timeout when waiting for next WAL entry ready, execute rollWALFile");
+          rollWALFile();
+        }
       }
     }