You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/19 15:47:36 UTC

[iotdb] branch master updated: [IOTDB-3791] Avoid redundant serialization and deserialization in multi-leader (#6655)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c39d956b1 [IOTDB-3791] Avoid redundant serialization and deserialization in multi-leader (#6655)
1c39d956b1 is described below

commit 1c39d956b113c690f1d9e36598f55a704b7f2383
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Tue Jul 19 23:47:30 2022 +0800

    [IOTDB-3791] Avoid redundant serialization and deserialization in multi-leader (#6655)
---
 .../common/request/IndexedConsensusRequest.java    |  21 +-
 .../request/MultiLeaderConsensusRequest.java       |  34 +-
 .../iotdb/consensus/config/MultiLeaderConfig.java  |   2 +-
 .../multileader/MultiLeaderServerImpl.java         |   9 +-
 .../multileader/logdispatcher/LogDispatcher.java   |   9 +-
 .../service/MultiLeaderRPCServiceProcessor.java    |   9 +-
 .../multileader/wal/ConsensusReqReader.java        |  20 --
 .../multileader/util/FakeConsensusReqReader.java   |  19 --
 .../consensus/multileader/util/TestEntry.java      |   8 +-
 .../multileader/util/TestStateMachine.java         |  29 +-
 .../consensus/statemachine/BaseStateMachine.java   |   4 +
 .../statemachine/DataRegionStateMachine.java       |  21 +-
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |  22 +-
 .../plan/planner/plan/node/write/InsertNode.java   |   7 +
 .../planner/plan/node/write/InsertRowNode.java     |  84 ++++-
 .../planner/plan/node/write/InsertTabletNode.java  |  56 +++-
 .../java/org/apache/iotdb/db/tools/WalChecker.java |  27 +-
 .../iotdb/db/wal/buffer/IWALByteBufferView.java    |   3 +
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  |  98 +++---
 .../org/apache/iotdb/db/wal/buffer/WALEntry.java   | 123 +++----
 .../apache/iotdb/db/wal/buffer/WALEntryType.java   |  14 +-
 .../apache/iotdb/db/wal/buffer/WALEntryValue.java  |   5 +-
 .../apache/iotdb/db/wal/buffer/WALInfoEntry.java   | 107 ++++++
 .../{SignalWALEntry.java => WALSignalEntry.java}   |  47 ++-
 .../java/org/apache/iotdb/db/wal/io/LogWriter.java |   9 +-
 .../apache/iotdb/db/wal/io/WALByteBufReader.java   |  95 ++++++
 .../org/apache/iotdb/db/wal/io/WALMetaData.java    |  96 ++++++
 .../java/org/apache/iotdb/db/wal/io/WALReader.java |  46 +--
 .../java/org/apache/iotdb/db/wal/io/WALWriter.java |  44 +++
 .../org/apache/iotdb/db/wal/node/WALFakeNode.java  |  13 -
 .../java/org/apache/iotdb/db/wal/node/WALNode.java | 319 ++++--------------
 .../iotdb/db/wal/recover/WALNodeRecoverTask.java   |  19 +-
 .../iotdb/db/wal/recover/WALRecoverWriter.java     |  63 ++++
 .../plan/node/write/InsertRowNodeSerdeTest.java    |   2 +-
 .../plan/node/write/InsertTabletNodeSerdeTest.java |   2 +-
 .../org/apache/iotdb/db/tools/WalCheckerTest.java  |  15 +-
 .../iotdb/db/wal/buffer/WALBufferCommonTest.java   |   2 +-
 .../org/apache/iotdb/db/wal/io/WALFileTest.java    |  21 +-
 .../iotdb/db/wal/node/ConsensusReqReaderTest.java  | 367 ++++++++++-----------
 .../db/wal/recover/WALRecoverManagerTest.java      |  11 +-
 .../file/UnsealedTsFileRecoverPerformerTest.java   |   9 +-
 .../iotdb/db/wal/utils/WALByteBufferForTest.java   |   5 +
 .../src/main/thrift/mutlileader.thrift             |   1 +
 43 files changed, 1121 insertions(+), 796 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 0eaade43f5..de7ac07250 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.common.request;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Objects;
 
 /** only used for multi-leader consensus. */
@@ -32,26 +33,26 @@ public class IndexedConsensusRequest implements IConsensusRequest {
 
   private final long safelyDeletedSearchIndex;
 
-  private final IConsensusRequest request;
+  private final List<IConsensusRequest> requests;
 
-  public IndexedConsensusRequest(long searchIndex, IConsensusRequest request) {
-    this(searchIndex, ConsensusReqReader.DEFAULT_SAFELY_DELETED_SEARCH_INDEX, request);
+  public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
+    this(searchIndex, ConsensusReqReader.DEFAULT_SAFELY_DELETED_SEARCH_INDEX, requests);
   }
 
   public IndexedConsensusRequest(
-      long searchIndex, long safelyDeletedSearchIndex, IConsensusRequest request) {
+      long searchIndex, long safelyDeletedSearchIndex, List<IConsensusRequest> requests) {
     this.searchIndex = searchIndex;
     this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
-    this.request = request;
+    this.requests = requests;
   }
 
   @Override
   public ByteBuffer serializeToByteBuffer() {
-    return request.serializeToByteBuffer();
+    throw new UnsupportedOperationException();
   }
 
-  public IConsensusRequest getRequest() {
-    return request;
+  public List<IConsensusRequest> getRequests() {
+    return requests;
   }
 
   public long getSearchIndex() {
@@ -73,11 +74,11 @@ public class IndexedConsensusRequest implements IConsensusRequest {
     IndexedConsensusRequest that = (IndexedConsensusRequest) o;
     return searchIndex == that.searchIndex
         && safelyDeletedSearchIndex == that.safelyDeletedSearchIndex
-        && Objects.equals(request, that.request);
+        && requests.equals(that.requests);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(searchIndex, safelyDeletedSearchIndex, request);
+    return Objects.hash(searchIndex, safelyDeletedSearchIndex, requests);
   }
 }
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/MultiLeaderConsensusRequest.java
similarity index 53%
copy from thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
copy to consensus/src/main/java/org/apache/iotdb/consensus/common/request/MultiLeaderConsensusRequest.java
index 5e2836c5a6..10c761743d 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/MultiLeaderConsensusRequest.java
@@ -17,22 +17,26 @@
  * under the License.
  */
 
-include "common.thrift"
-namespace java org.apache.iotdb.consensus.multileader.thrift
+package org.apache.iotdb.consensus.common.request;
 
-struct TLogBatch {
-  1: required binary data
-}
+import java.nio.ByteBuffer;
 
-struct TSyncLogReq {
-  1: required common.TConsensusGroupId consensusGroupId
-  2: required list<TLogBatch> batches
-}
+/**
+ * This class is used to represent the sync log request from MultiLeaderConsensus. That we use this
+ * class rather than ByteBufferConsensusRequest is because the serialization method is different
+ * between these two classes. And we need to separate them in DataRegionStateMachine when
+ * deserialize the PlanNode from ByteBuffer
+ */
+public class MultiLeaderConsensusRequest implements IConsensusRequest {
 
-struct TSyncLogRes {
-  1: required list<common.TSStatus> status
-}
+  private final ByteBuffer byteBuffer;
+
+  public MultiLeaderConsensusRequest(ByteBuffer byteBuffer) {
+    this.byteBuffer = byteBuffer;
+  }
 
-service MultiLeaderConsensusIService {
-  TSyncLogRes syncLog(TSyncLogReq req)
-}
\ No newline at end of file
+  @Override
+  public ByteBuffer serializeToByteBuffer() {
+    return byteBuffer;
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 06cc2e453c..e4f66f557b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -249,7 +249,7 @@ public class MultiLeaderConfig {
     public static class Builder {
       private int maxPendingRequestNumPerNode = 200;
       private int maxRequestPerBatch = 40;
-      private int maxPendingBatch = 6;
+      private int maxPendingBatch = 1;
       private int maxWaitingTimeForAccumulatingBatchInMs = 500;
       private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
       private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 2c21beba3e..a0439bf8c9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.config.MultiLeaderConfig;
@@ -45,6 +44,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -181,12 +181,13 @@ public class MultiLeaderServerImpl {
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
       IConsensusRequest request) {
-    return new IndexedConsensusRequest(index.incrementAndGet(), request);
+    return new IndexedConsensusRequest(index.incrementAndGet(), Collections.singletonList(request));
   }
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
-      ByteBufferConsensusRequest request) {
-    return new IndexedConsensusRequest(ConsensusReqReader.DEFAULT_SEARCH_INDEX, request);
+      IConsensusRequest request) {
+    return new IndexedConsensusRequest(
+        ConsensusReqReader.DEFAULT_SEARCH_INDEX, Collections.singletonList(request));
   }
 
   /**
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 0a65148fe3..8ab54482f1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
@@ -337,7 +338,9 @@ public class LogDispatcher {
         IndexedConsensusRequest data = walEntryiterator.next();
         currentIndex = data.getSearchIndex();
         iteratorIndex = currentIndex;
-        logBatches.add(new TLogBatch(data.serializeToByteBuffer()));
+        for (IConsensusRequest innerRequest : data.getRequests()) {
+          logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(), true));
+        }
         if (currentIndex == maxIndex - 1) {
           break;
         }
@@ -347,7 +350,9 @@ public class LogDispatcher {
 
     private void constructBatchIndexedFromConsensusRequest(
         IndexedConsensusRequest request, List<TLogBatch> logBatches) {
-      logBatches.add(new TLogBatch(request.serializeToByteBuffer()));
+      for (IConsensusRequest innerRequest : request.getRequests()) {
+        logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(), false));
+      }
     }
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 196bf120ff..b83c900892 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.multileader.service;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
 import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
 import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
 import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
@@ -30,7 +31,6 @@ import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,8 +50,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
   }
 
   @Override
-  public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes> resultHandler)
-      throws TException {
+  public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes> resultHandler) {
     try {
       ConsensusGroupId groupId =
           ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -75,7 +74,9 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
               impl.getStateMachine()
                   .write(
                       impl.buildIndexedConsensusRequestForRemoteRequest(
-                          new ByteBufferConsensusRequest(batch.data))));
+                          batch.isFromWAL()
+                              ? new MultiLeaderConsensusRequest(batch.data)
+                              : new ByteBufferConsensusRequest(batch.data))));
         }
       }
       logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId, statuses);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
index ea462924e4..b3ad6ec09f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
@@ -19,11 +19,9 @@
 
 package org.apache.iotdb.consensus.multileader.wal;
 
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 
 import java.util.Iterator;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -38,24 +36,6 @@ public interface ConsensusReqReader {
 
   void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex);
 
-  /**
-   * Gets the consensus request at the specified position.
-   *
-   * @param index index of the consensus request to return
-   * @return the consensus request at the specified position, null if the request doesn't exist
-   */
-  IConsensusRequest getReq(long index);
-
-  /**
-   * Gets the consensus requests from the specified start position.
-   *
-   * @param startIndex index of the start consensus request
-   * @param num number of consensus requests to return, the number of actual returned consensus
-   *     requests may less than this value
-   * @return the consensus requests from the specified start position
-   */
-  List<IConsensusRequest> getReqs(long startIndex, int num);
-
   /**
    * Gets the consensus requests iterator from the specified start position.
    *
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
index a7f7134940..334d5a4fc6 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
@@ -20,11 +20,9 @@
 package org.apache.iotdb.consensus.multileader.util;
 
 import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -39,23 +37,6 @@ public class FakeConsensusReqReader implements ConsensusReqReader, DataSet {
   @Override
   public void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex) {}
 
-  @Override
-  public IConsensusRequest getReq(long index) {
-    synchronized (requestSets) {
-      for (IndexedConsensusRequest indexedConsensusRequest : requestSets.getRequestSet()) {
-        if (indexedConsensusRequest.getSearchIndex() == index) {
-          return indexedConsensusRequest;
-        }
-      }
-      return null;
-    }
-  }
-
-  @Override
-  public List<IConsensusRequest> getReqs(long startIndex, int num) {
-    return null;
-  }
-
   @Override
   public ReqIterator getReqIterator(long startIndex) {
     return new FakeConsensusReqIterator(startIndex);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java
index 214af5a8d6..874aa9b7f1 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.consensus.multileader.util;
 
 import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import java.io.DataOutputStream;
@@ -28,14 +28,18 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-public class TestEntry implements IConsensusRequest {
+public class TestEntry extends MultiLeaderConsensusRequest {
 
   private final int num;
   private final Peer peer;
 
   public TestEntry(int num, Peer peer) {
+    super(ByteBuffer.allocate(Integer.BYTES));
     this.num = num;
     this.peer = peer;
+    ByteBuffer buffer = super.serializeToByteBuffer();
+    buffer.putInt(num);
+    buffer.clear();
   }
 
   @Override
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
index 1a9507da07..eab940cf87 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
@@ -23,16 +23,18 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -46,7 +48,14 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
 
   public Set<TestEntry> getData() {
     Set<TestEntry> data = new HashSet<>();
-    requestSets.getRequestSet().forEach(x -> data.add((TestEntry) x.getRequest()));
+    requestSets
+        .getRequestSet()
+        .forEach(
+            x -> {
+              for (IConsensusRequest request : x.getRequests()) {
+                data.add((TestEntry) request);
+              }
+            });
     return data;
   }
 
@@ -59,17 +68,15 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
   @Override
   public TSStatus write(IConsensusRequest request) {
     synchronized (requestSets) {
-      IConsensusRequest innerRequest = ((IndexedConsensusRequest) request).getRequest();
-      if (innerRequest instanceof ByteBufferConsensusRequest) {
+      IndexedConsensusRequest indexedConsensusRequest = (IndexedConsensusRequest) request;
+      List<IConsensusRequest> transformedRequest = new ArrayList<>();
+      for (IConsensusRequest innerRequest : indexedConsensusRequest.getRequests()) {
         ByteBuffer buffer = innerRequest.serializeToByteBuffer();
-        requestSets.add(
-            new IndexedConsensusRequest(
-                ((IndexedConsensusRequest) request).getSearchIndex(),
-                new TestEntry(buffer.getInt(), Peer.deserialize(buffer))),
-            false);
-      } else {
-        requestSets.add(((IndexedConsensusRequest) request), true);
+        transformedRequest.add(new TestEntry(buffer.getInt(), Peer.deserialize(buffer)));
       }
+      requestSets.add(
+          new IndexedConsensusRequest(indexedConsensusRequest.getSearchIndex(), transformedRequest),
+          indexedConsensusRequest.getSearchIndex() != ConsensusReqReader.DEFAULT_SEARCH_INDEX);
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 6d8dc5f62b..32333f26e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -22,9 +22,11 @@ package org.apache.iotdb.db.consensus.statemachine;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.wal.buffer.WALEntry;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +52,8 @@ public abstract class BaseStateMachine implements IStateMachine, IStateMachine.E
     PlanNode node;
     if (request instanceof ByteBufferConsensusRequest) {
       node = PlanNodeType.deserialize(request.serializeToByteBuffer());
+    } else if (request instanceof MultiLeaderConsensusRequest) {
+      node = WALEntry.deserializeInsertNode(request.serializeToByteBuffer());
     } else if (request instanceof PlanNode) {
       node = (PlanNode) request;
     } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 8ed5bbb7cd..e5d9c2b6f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -98,19 +98,30 @@ public class DataRegionStateMachine extends BaseStateMachine {
 
   @Override
   public TSStatus write(IConsensusRequest request) {
+    TSStatus status;
     PlanNode planNode;
     try {
       if (request instanceof IndexedConsensusRequest) {
+        status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
         IndexedConsensusRequest indexedConsensusRequest = (IndexedConsensusRequest) request;
-        planNode = getPlanNode(indexedConsensusRequest.getRequest());
-        if (planNode instanceof InsertNode) {
-          ((InsertNode) planNode)
-              .setSearchIndex(((IndexedConsensusRequest) request).getSearchIndex());
+        for (IConsensusRequest innerRequest : indexedConsensusRequest.getRequests()) {
+          planNode = getPlanNode(innerRequest);
+          if (planNode instanceof InsertNode) {
+            ((InsertNode) planNode)
+                .setSearchIndex(((IndexedConsensusRequest) request).getSearchIndex());
+          }
+          TSStatus subStatus = write(planNode);
+          if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            status.setCode(subStatus.getCode());
+            status.setMessage(subStatus.getMessage());
+          }
+          status.addToSubStatus(subStatus);
         }
       } else {
         planNode = getPlanNode(request);
+        status = write(planNode);
       }
-      return write(planNode);
+      return status;
     } catch (IllegalArgumentException e) {
       logger.error(e.getMessage(), e);
       return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index d31db09d34..4c176d83b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.plan.planner.plan.node;
 
-import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesCountNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesSchemaScanNode;
@@ -127,6 +126,8 @@ public enum PlanNodeType {
   NODE_PATHS_COUNT((short) 49),
   INTERNAL_CREATE_TIMESERIES((short) 50);
 
+  public static final int BYTES = Short.BYTES;
+
   private final short nodeType;
 
   PlanNodeType(short nodeType) {
@@ -141,14 +142,25 @@ public enum PlanNodeType {
     ReadWriteIOUtils.write(nodeType, stream);
   }
 
-  public static PlanNode deserialize(DataInputStream stream)
-      throws IOException, IllegalPathException {
+  public static PlanNode deserializeFromWAL(DataInputStream stream) throws IOException {
     short nodeType = stream.readShort();
     switch (nodeType) {
       case 13:
-        return InsertTabletNode.deserialize(stream);
+        return InsertTabletNode.deserializeFromWAL(stream);
+      case 14:
+        return InsertRowNode.deserializeFromWAL(stream);
+      default:
+        throw new IllegalArgumentException("Invalid node type: " + nodeType);
+    }
+  }
+
+  public static PlanNode deserializeFromWAL(ByteBuffer buffer) {
+    short nodeType = buffer.getShort();
+    switch (nodeType) {
+      case 13:
+        return InsertTabletNode.deserializeFromWAL(buffer);
       case 14:
-        return InsertRowNode.deserialize(stream);
+        return InsertRowNode.deserializeFromWAL(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index 9b4fe72f0c..f5b5bddb52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -203,6 +203,13 @@ public abstract class InsertNode extends WritePlanNode {
       measurements[i] = measurementSchemas[i].getMeasurementId();
     }
   }
+
+  protected void deserializeMeasurementSchemas(ByteBuffer buffer) {
+    for (int i = 0; i < measurementSchemas.length; i++) {
+      measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
+      measurements[i] = measurementSchemas[i].getMeasurementId();
+    }
+  }
   // endregion
 
   public TRegionReplicaSet getRegionReplicaSet() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index ef03fcd046..883fcc30c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -569,6 +569,10 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
     return size;
   }
 
+  /**
+   * Compared with {@link this#serialize(ByteBuffer)}, more info: search index, less info:
+   * isNeedInferType
+   */
   @Override
   public void serializeToWAL(IWALByteBufferView buffer) {
     buffer.putShort((short) PlanNodeType.INSERT_ROW.ordinal());
@@ -629,19 +633,22 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   }
 
   /** Deserialize from wal */
-  public static InsertRowNode deserialize(DataInputStream stream)
-      throws IOException, IllegalPathException {
+  public static InsertRowNode deserializeFromWAL(DataInputStream stream) throws IOException {
     // we do not store plan node id in wal entry
     InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
     insertNode.setSearchIndex(stream.readLong());
     insertNode.setTime(stream.readLong());
-    insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(stream)));
-    insertNode.deserializeMeasurementsAndValues(stream);
+    try {
+      insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(stream)));
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
+    }
+    insertNode.deserializeMeasurementsAndValuesFromWAL(stream);
 
     return insertNode;
   }
 
-  void deserializeMeasurementsAndValues(DataInputStream stream) throws IOException {
+  void deserializeMeasurementsAndValuesFromWAL(DataInputStream stream) throws IOException {
     int measurementSize = stream.readInt();
 
     measurements = new String[measurementSize];
@@ -650,13 +657,13 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
 
     dataTypes = new TSDataType[measurementSize];
     values = new Object[measurementSize];
-    fillDataTypesAndValues(stream);
+    fillDataTypesAndValuesFromWAL(stream);
 
     isAligned = stream.readByte() == 1;
   }
 
   /** Make sure the dataTypes and values have been created before calling this */
-  public void fillDataTypesAndValues(DataInputStream stream) throws IOException {
+  public void fillDataTypesAndValuesFromWAL(DataInputStream stream) throws IOException {
     for (int i = 0; i < dataTypes.length; i++) {
       byte typeNum = stream.readByte();
       if (typeNum == TYPE_NULL) {
@@ -687,6 +694,69 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
       }
     }
   }
+
+  /** Deserialize from wal */
+  public static InsertRowNode deserializeFromWAL(ByteBuffer buffer) {
+    // we do not store plan node id in wal entry
+    InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
+    insertNode.setSearchIndex(buffer.getLong());
+    insertNode.setTime(buffer.getLong());
+    try {
+      insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(buffer)));
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
+    }
+    insertNode.deserializeMeasurementsAndValuesFromWAL(buffer);
+
+    return insertNode;
+  }
+
+  void deserializeMeasurementsAndValuesFromWAL(ByteBuffer buffer) {
+    int measurementSize = buffer.getInt();
+
+    measurements = new String[measurementSize];
+    measurementSchemas = new MeasurementSchema[measurementSize];
+    deserializeMeasurementSchemas(buffer);
+
+    dataTypes = new TSDataType[measurementSize];
+    values = new Object[measurementSize];
+    fillDataTypesAndValuesFromWAL(buffer);
+
+    isAligned = buffer.get() == 1;
+  }
+
+  /** Make sure the dataTypes and values have been created before calling this */
+  public void fillDataTypesAndValuesFromWAL(ByteBuffer buffer) {
+    for (int i = 0; i < dataTypes.length; i++) {
+      byte typeNum = buffer.get();
+      if (typeNum == TYPE_NULL) {
+        continue;
+      }
+      dataTypes[i] = TSDataType.values()[typeNum];
+      switch (dataTypes[i]) {
+        case BOOLEAN:
+          values[i] = ReadWriteIOUtils.readBool(buffer);
+          break;
+        case INT32:
+          values[i] = ReadWriteIOUtils.readInt(buffer);
+          break;
+        case INT64:
+          values[i] = ReadWriteIOUtils.readLong(buffer);
+          break;
+        case FLOAT:
+          values[i] = ReadWriteIOUtils.readFloat(buffer);
+          break;
+        case DOUBLE:
+          values[i] = ReadWriteIOUtils.readDouble(buffer);
+          break;
+        case TEXT:
+          values[i] = ReadWriteIOUtils.readBinary(buffer);
+          break;
+        default:
+          throw new RuntimeException("Unsupported data type:" + dataTypes[i]);
+      }
+    }
+  }
   // endregion
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index c4c447c039..a9e64c9db9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -780,6 +780,10 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     return size;
   }
 
+  /**
+   * Compared with {@link this#serialize(ByteBuffer)}, more info: search index and data types, less
+   * info: isNeedInferType
+   */
   @Override
   public void serializeToWAL(IWALByteBufferView buffer) {
     serializeToWAL(buffer, 0, rowCount);
@@ -894,17 +898,20 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
   }
 
   /** Deserialize from wal */
-  public static InsertTabletNode deserialize(DataInputStream stream)
-      throws IllegalPathException, IOException {
+  public static InsertTabletNode deserializeFromWAL(DataInputStream stream) throws IOException {
     // we do not store plan node id in wal entry
     InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
-    insertNode.subDeserialize(stream);
+    insertNode.subDeserializeFromWAL(stream);
     return insertNode;
   }
 
-  private void subDeserialize(DataInputStream stream) throws IllegalPathException, IOException {
+  private void subDeserializeFromWAL(DataInputStream stream) throws IOException {
     searchIndex = stream.readLong();
-    devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
+    try {
+      devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Cannot deserialize InsertTabletNode", e);
+    }
 
     int measurementSize = stream.readInt();
     measurements = new String[measurementSize];
@@ -929,6 +936,45 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
         QueryDataSetUtils.readTabletValuesFromStream(stream, dataTypes, measurementSize, rowCount);
     isAligned = stream.readByte() == 1;
   }
+
+  public static InsertTabletNode deserializeFromWAL(ByteBuffer buffer) {
+    // we do not store plan node id in wal entry
+    InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
+    insertNode.subDeserializeFromWAL(buffer);
+    return insertNode;
+  }
+
+  private void subDeserializeFromWAL(ByteBuffer buffer) {
+    searchIndex = buffer.getLong();
+    try {
+      devicePath = new PartialPath(ReadWriteIOUtils.readString(buffer));
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Cannot deserialize InsertTabletNode", e);
+    }
+
+    int measurementSize = buffer.getInt();
+    measurements = new String[measurementSize];
+    measurementSchemas = new MeasurementSchema[measurementSize];
+    deserializeMeasurementSchemas(buffer);
+
+    // data types are serialized in measurement schemas
+    dataTypes = new TSDataType[measurementSize];
+    for (int i = 0; i < measurementSize; i++) {
+      dataTypes[i] = measurementSchemas[i].getType();
+    }
+
+    rowCount = buffer.getInt();
+    times = new long[rowCount];
+    times = QueryDataSetUtils.readTimesFromBuffer(buffer, rowCount);
+
+    boolean hasBitMaps = BytesUtils.byteToBool(buffer.get());
+    if (hasBitMaps) {
+      bitMaps = QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize, rowCount);
+    }
+    columns =
+        QueryDataSetUtils.readTabletValuesFromBuffer(buffer, dataTypes, measurementSize, rowCount);
+    isAligned = buffer.get() == 1;
+  }
   // endregion
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java b/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
index 9c4da773e2..116ee01b4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.tools;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.db.exception.SystemCheckException;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALEntryType;
 import org.apache.iotdb.db.wal.utils.WALFileUtils;
 
 import org.slf4j.Logger;
@@ -30,13 +30,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
-import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -89,26 +87,14 @@ public class WalChecker {
   }
 
   private boolean checkFile(File walFile) {
-    int totalSize = 0;
     try (DataInputStream logStream =
         new DataInputStream(new BufferedInputStream(new FileInputStream(walFile)))) {
       while (logStream.available() > 0) {
         WALEntry walEntry = WALEntry.deserialize(logStream);
-        totalSize += walEntry.serializedSize();
-        if (walEntry.getValue() instanceof InsertTabletNode) {
-          InsertTabletNode insertNode = (InsertTabletNode) walEntry.getValue();
-          System.err.printf(
-              "searchIndex: %s, timestamp: %s%n",
-              insertNode.getSearchIndex(), Arrays.toString(insertNode.getTimes()));
+        if (walEntry.getType() == WALEntryType.WAL_FILE_INFO_END_MARKER) {
+          return true;
         }
       }
-    } catch (EOFException e) {
-      if (totalSize == walFile.length()) {
-        return true;
-      } else {
-        logger.error("{} fails the check because", walFile, e);
-        return false;
-      }
     } catch (FileNotFoundException e) {
       logger.debug("Wal file doesn't exist, skipping");
       return true;
@@ -130,7 +116,12 @@ public class WalChecker {
 
   /** @param args walRootDirectory */
   public static void main(String[] args) throws SystemCheckException {
-    WalChecker checker = new WalChecker("/Users/heimingz/Desktop/0");
+    if (args.length < 1) {
+      logger.error("No enough args: require the walRootDirectory");
+      return;
+    }
+
+    WalChecker checker = new WalChecker(args[0]);
     List<File> files = checker.doCheck();
     report(files);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALByteBufferView.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALByteBufferView.java
index 977ac51dfc..f78355be54 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALByteBufferView.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALByteBufferView.java
@@ -49,4 +49,7 @@ public interface IWALByteBufferView {
 
   /** Like {@link ByteBuffer#putDouble(double)}. */
   void putDouble(double value);
+
+  /** Like {@link ByteBuffer#position()}. */
+  int position();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index e14ba4943d..4d4d710eb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.utils.MmapUtil;
 import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
+import org.apache.iotdb.db.wal.io.WALMetaData;
 import org.apache.iotdb.db.wal.utils.WALFileStatus;
 import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
 
@@ -35,7 +36,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
-import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -46,6 +46,8 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static org.apache.iotdb.db.wal.node.WALNode.DEFAULT_SEARCH_INDEX;
+
 /**
  * This buffer guarantees the concurrent safety and uses double buffers mechanism to accelerate
  * writes and avoid waiting for buffer syncing to disk.
@@ -53,7 +55,7 @@ import java.util.concurrent.locks.ReentrantLock;
 public class WALBuffer extends AbstractWALBuffer {
   private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private static final int WAL_BUFFER_SIZE = config.getWalBufferSize();
+  private static final int HALF_WAL_BUFFER_SIZE = config.getWalBufferSize() / 2;
   private static final int QUEUE_CAPACITY = config.getWalBufferQueueCapacity();
 
   /** whether close method is called */
@@ -102,8 +104,8 @@ public class WALBuffer extends AbstractWALBuffer {
 
   private void allocateBuffers() {
     try {
-      workingBuffer = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE / 2);
-      idleBuffer = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE / 2);
+      workingBuffer = ByteBuffer.allocateDirect(HALF_WAL_BUFFER_SIZE);
+      idleBuffer = ByteBuffer.allocateDirect(HALF_WAL_BUFFER_SIZE);
     } catch (OutOfMemoryError e) {
       logger.error("Fail to allocate wal node-{}'s buffer because out of memory.", identifier, e);
       close();
@@ -129,13 +131,18 @@ public class WALBuffer extends AbstractWALBuffer {
   }
 
   // region Task of serializeThread
+  /** This info class traverses some extra info from serializeThread to syncBufferThread */
+  private static class SerializeInfo {
+    final WALMetaData metaData = new WALMetaData();
+    final List<WALFlushListener> fsyncListeners = new LinkedList<>();
+    WALFlushListener rollWALFileWriterListener = null;
+  }
+
   /** This task serializes WALEntry to workingBuffer and will call fsync at last. */
   private class SerializeTask implements Runnable {
-    private final IWALByteBufferView byteBufferVew = new ByteBufferView();
-    private final List<WALFlushListener> fsyncListeners = new LinkedList<>();
-
+    private final ByteBufferView byteBufferVew = new ByteBufferView();
+    private final SerializeInfo info = new SerializeInfo();
     private int batchSize = 0;
-    private WALFlushListener rollWALFileWriterListener = null;
 
     @Override
     public void run() {
@@ -183,8 +190,7 @@ public class WALBuffer extends AbstractWALBuffer {
 
       // call fsync at last and set fsyncListeners
       if (batchSize > 0) {
-        fsyncWorkingBuffer(
-            currentSearchIndex, currentFileStatus, fsyncListeners, rollWALFileWriterListener);
+        fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
       }
     }
 
@@ -194,13 +200,13 @@ public class WALBuffer extends AbstractWALBuffer {
      */
     private boolean handleWALEntry(WALEntry walEntry) {
       if (walEntry.isSignal()) {
-        return handleSignalEntry((SignalWALEntry) walEntry);
+        return handleSignalEntry((WALSignalEntry) walEntry);
       }
 
       boolean success = handleInfoEntry(walEntry);
       if (success) {
         ++batchSize;
-        fsyncListeners.add(walEntry.getWalFlushListener());
+        info.fsyncListeners.add(walEntry.getWalFlushListener());
       }
       return false;
     }
@@ -211,8 +217,11 @@ public class WALBuffer extends AbstractWALBuffer {
      * @return true if serialization is successful.
      */
     private boolean handleInfoEntry(WALEntry walEntry) {
+      int size = byteBufferVew.position();
       try {
         walEntry.serialize(byteBufferVew);
+        size = byteBufferVew.position() - size;
+        logger.debug("wal entry size is: {}", size);
       } catch (Exception e) {
         logger.error(
             "Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e);
@@ -220,14 +229,17 @@ public class WALBuffer extends AbstractWALBuffer {
         return false;
       }
       // update search index
+      long searchIndex = DEFAULT_SEARCH_INDEX;
       if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
           || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
         InsertNode insertNode = (InsertNode) walEntry.getValue();
         if (insertNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) {
+          searchIndex = insertNode.getSearchIndex();
           currentSearchIndex = insertNode.getSearchIndex();
           currentFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
         }
       }
+      info.metaData.add(size, searchIndex);
       return true;
     }
 
@@ -235,13 +247,12 @@ public class WALBuffer extends AbstractWALBuffer {
      * @return true if fsyncWorkingBuffer has been called, which means this serialization task
      *     should be ended.
      */
-    private boolean handleSignalEntry(SignalWALEntry signalWALEntry) {
-      switch (signalWALEntry.getSignalType()) {
+    private boolean handleSignalEntry(WALSignalEntry walSignalEntry) {
+      switch (walSignalEntry.getType()) {
         case ROLL_WAL_LOG_WRITER_SIGNAL:
           logger.debug("Handle roll log writer signal for wal node-{}.", identifier);
-          rollWALFileWriterListener = signalWALEntry.getWalFlushListener();
-          fsyncWorkingBuffer(
-              currentSearchIndex, currentFileStatus, fsyncListeners, rollWALFileWriterListener);
+          info.rollWALFileWriterListener = walSignalEntry.getWalFlushListener();
+          fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
           return true;
         case CLOSE_SIGNAL:
           logger.debug(
@@ -250,8 +261,7 @@ public class WALBuffer extends AbstractWALBuffer {
               walEntries.size());
           boolean dataExists = batchSize > 0;
           if (dataExists) {
-            fsyncWorkingBuffer(
-                currentSearchIndex, currentFileStatus, fsyncListeners, rollWALFileWriterListener);
+            fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
           }
           return dataExists;
         default:
@@ -265,6 +275,8 @@ public class WALBuffer extends AbstractWALBuffer {
    * serializeThread and this class is only used by serializeThread.
    */
   private class ByteBufferView implements IWALByteBufferView {
+    private int flushedBytesNum = 0;
+
     private void ensureEnoughSpace(int bytesNum) {
       if (workingBuffer.remaining() < bytesNum) {
         rollBuffer();
@@ -272,6 +284,7 @@ public class WALBuffer extends AbstractWALBuffer {
     }
 
     private void rollBuffer() {
+      flushedBytesNum += workingBuffer.position();
       syncWorkingBuffer(currentSearchIndex, currentFileStatus);
     }
 
@@ -333,6 +346,11 @@ public class WALBuffer extends AbstractWALBuffer {
       ensureEnoughSpace(Double.BYTES);
       workingBuffer.putDouble(value);
     }
+
+    @Override
+    public int position() {
+      return flushedBytesNum + workingBuffer.position();
+    }
   }
 
   /** Notice: this method only called when buffer is exhausted by SerializeTask. */
@@ -343,15 +361,9 @@ public class WALBuffer extends AbstractWALBuffer {
   }
 
   /** Notice: this method only called at the last of SerializeTask. */
-  private void fsyncWorkingBuffer(
-      long searchIndex,
-      WALFileStatus fileStatus,
-      List<WALFlushListener> fsyncListeners,
-      WALFlushListener rollWALFileWriterListener) {
+  private void fsyncWorkingBuffer(long searchIndex, WALFileStatus fileStatus, SerializeInfo info) {
     switchWorkingBufferToFlushing();
-    syncBufferThread.submit(
-        new SyncBufferTask(
-            searchIndex, fileStatus, true, fsyncListeners, rollWALFileWriterListener));
+    syncBufferThread.submit(new SyncBufferTask(searchIndex, fileStatus, true, info));
     currentFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
   }
 
@@ -383,24 +395,18 @@ public class WALBuffer extends AbstractWALBuffer {
     private final long searchIndex;
     private final WALFileStatus fileStatus;
     private final boolean forceFlag;
-    private final List<WALFlushListener> fsyncListeners;
-    private final WALFlushListener rollWALFileWriterListener;
+    private final SerializeInfo info;
 
     public SyncBufferTask(long searchIndex, WALFileStatus fileStatus, boolean forceFlag) {
-      this(searchIndex, fileStatus, forceFlag, null, null);
+      this(searchIndex, fileStatus, forceFlag, null);
     }
 
     public SyncBufferTask(
-        long searchIndex,
-        WALFileStatus fileStatus,
-        boolean forceFlag,
-        List<WALFlushListener> fsyncListeners,
-        WALFlushListener rollWALFileWriterListener) {
+        long searchIndex, WALFileStatus fileStatus, boolean forceFlag, SerializeInfo info) {
       this.searchIndex = searchIndex;
       this.fileStatus = fileStatus;
       this.forceFlag = forceFlag;
-      this.fsyncListeners = fsyncListeners == null ? Collections.emptyList() : fsyncListeners;
-      this.rollWALFileWriterListener = rollWALFileWriterListener;
+      this.info = info == null ? new SerializeInfo() : info;
     }
 
     @Override
@@ -409,7 +415,7 @@ public class WALBuffer extends AbstractWALBuffer {
 
       // flush buffer to os
       try {
-        currentWALFileWriter.write(syncingBuffer);
+        currentWALFileWriter.write(syncingBuffer, info.metaData);
       } catch (Throwable e) {
         logger.error(
             "Fail to sync wal node-{}'s buffer, change system mode to read-only.", identifier, e);
@@ -427,32 +433,32 @@ public class WALBuffer extends AbstractWALBuffer {
               "Fail to fsync wal node-{}'s log writer, change system mode to read-only.",
               identifier,
               e);
-          for (WALFlushListener fsyncListener : fsyncListeners) {
+          for (WALFlushListener fsyncListener : info.fsyncListeners) {
             fsyncListener.fail(e);
           }
           config.setReadOnly(true);
         }
         // notify all waiting listeners
-        for (WALFlushListener fsyncListener : fsyncListeners) {
+        for (WALFlushListener fsyncListener : info.fsyncListeners) {
           fsyncListener.succeed();
         }
       }
 
       // try to roll log writer
-      if (rollWALFileWriterListener != null
+      if (info.rollWALFileWriterListener != null
           || (forceFlag && currentWALFileWriter.size() >= config.getWalFileSizeThresholdInByte())) {
         try {
           rollLogWriter(searchIndex, currentWALFileWriter.getWalFileStatus());
-          if (rollWALFileWriterListener != null) {
-            rollWALFileWriterListener.succeed();
+          if (info.rollWALFileWriterListener != null) {
+            info.rollWALFileWriterListener.succeed();
           }
         } catch (IOException e) {
           logger.error(
               "Fail to roll wal node-{}'s log writer, change system mode to read-only.",
               identifier,
               e);
-          if (rollWALFileWriterListener != null) {
-            rollWALFileWriterListener.fail(e);
+          if (info.rollWALFileWriterListener != null) {
+            info.rollWALFileWriterListener.fail(e);
           }
           config.setReadOnly(true);
         }
@@ -502,7 +508,7 @@ public class WALBuffer extends AbstractWALBuffer {
     if (serializeThread != null) {
       // add close signal WALEntry to notify serializeThread
       try {
-        walEntries.put(new SignalWALEntry(SignalWALEntry.SignalType.CLOSE_SIGNAL));
+        walEntries.put(new WALSignalEntry(WALEntryType.CLOSE_SIGNAL));
       } catch (InterruptedException e) {
         logger.error("Fail to put CLOSE_SIGNAL to walEntries.", e);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
index 0c9140c778..27ae7f867d 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
@@ -19,10 +19,9 @@
 package org.apache.iotdb.db.wal.buffer;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
@@ -31,56 +30,34 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.utils.SerializedSize;
-import org.apache.iotdb.db.wal.utils.WALMode;
 import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Objects;
 
 /**
  * WALEntry is the basic element of .wal file, including type, memTable id, and specific
  * value(physical plan or memTable snapshot).
  */
-public class WALEntry implements SerializedSize {
-  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-
-  /** wal entry type 1 byte, memTable id 8 bytes */
-  private static final int FIXED_SERIALIZED_SIZE = Byte.BYTES + Long.BYTES;
+public abstract class WALEntry implements SerializedSize {
+  private static final Logger logger = LoggerFactory.getLogger(WALEntry.class);
 
   /** type of value */
-  private final WALEntryType type;
+  protected final WALEntryType type;
   /** memTable id */
-  private final long memTableId;
+  protected final long memTableId;
   /** value(physical plan or memTable snapshot) */
-  private final WALEntryValue value;
-  /** extra info for InsertTabletPlan type value */
-  private TabletInfo tabletInfo;
-
+  protected final WALEntryValue value;
   /**
    * listen whether this WALEntry has been written to the filesystem, null iff this WALEntry is
    * deserialized from .wal file
    */
-  private final WALFlushListener walFlushListener;
-
-  public WALEntry(long memTableId, WALEntryValue value) {
-    this(memTableId, value, config.getWalMode() == WALMode.SYNC);
-    if (value instanceof InsertTabletPlan) {
-      tabletInfo = new TabletInfo(0, ((InsertTabletPlan) value).getRowCount());
-    } else if (value instanceof InsertTabletNode) {
-      tabletInfo = new TabletInfo(0, ((InsertTabletNode) value).getRowCount());
-    }
-  }
-
-  public WALEntry(long memTableId, InsertTabletPlan value, int tabletStart, int tabletEnd) {
-    this(memTableId, value, config.getWalMode() == WALMode.SYNC);
-    tabletInfo = new TabletInfo(tabletStart, tabletEnd);
-  }
-
-  public WALEntry(long memTableId, InsertTabletNode value, int tabletStart, int tabletEnd) {
-    this(memTableId, value, config.getWalMode() == WALMode.SYNC);
-    tabletInfo = new TabletInfo(tabletStart, tabletEnd);
-  }
+  protected final WALFlushListener walFlushListener;
 
   public WALEntry(long memTableId, WALEntryValue value, boolean wait) {
     this.memTableId = memTableId;
@@ -103,38 +80,14 @@ public class WALEntry implements SerializedSize {
     walFlushListener = new WALFlushListener(wait);
   }
 
-  private WALEntry(WALEntryType type, long memTableId, WALEntryValue value) {
+  protected WALEntry(WALEntryType type, long memTableId, WALEntryValue value, boolean wait) {
     this.type = type;
     this.memTableId = memTableId;
     this.value = value;
-    this.walFlushListener = null;
+    this.walFlushListener = new WALFlushListener(wait);
   }
 
-  @Override
-  public int serializedSize() {
-    return FIXED_SERIALIZED_SIZE + value.serializedSize();
-  }
-
-  public void serialize(IWALByteBufferView buffer) {
-    buffer.put(type.getCode());
-    buffer.putLong(memTableId);
-    switch (type) {
-      case INSERT_TABLET_PLAN:
-        ((InsertTabletPlan) value)
-            .serializeToWAL(buffer, tabletInfo.tabletStart, tabletInfo.tabletEnd);
-        break;
-      case INSERT_TABLET_NODE:
-        ((InsertTabletNode) value)
-            .serializeToWAL(buffer, tabletInfo.tabletStart, tabletInfo.tabletEnd);
-        break;
-      case INSERT_ROW_PLAN:
-      case INSERT_ROW_NODE:
-      case DELETE_PLAN:
-      case MEMORY_TABLE_SNAPSHOT:
-        value.serializeToWAL(buffer);
-        break;
-    }
-  }
+  public abstract void serialize(IWALByteBufferView buffer);
 
   public static WALEntry deserialize(DataInputStream stream)
       throws IllegalPathException, IOException {
@@ -144,6 +97,15 @@ public class WALEntry implements SerializedSize {
       throw new IOException("unrecognized wal entry type " + typeNum);
     }
 
+    // handle signal
+    switch (type) {
+      case CLOSE_SIGNAL:
+      case ROLL_WAL_LOG_WRITER_SIGNAL:
+      case WAL_FILE_INFO_END_MARKER:
+        return new WALSignalEntry(type);
+    }
+
+    // handle info
     long memTableId = stream.readLong();
     WALEntryValue value = null;
     switch (type) {
@@ -160,13 +122,30 @@ public class WALEntry implements SerializedSize {
         value = AbstractMemTable.Factory.create(stream);
         break;
       case INSERT_ROW_NODE:
-        value = (InsertRowNode) PlanNodeType.deserialize(stream);
+        value = (InsertRowNode) PlanNodeType.deserializeFromWAL(stream);
         break;
       case INSERT_TABLET_NODE:
-        value = (InsertTabletNode) PlanNodeType.deserialize(stream);
+        value = (InsertTabletNode) PlanNodeType.deserializeFromWAL(stream);
         break;
     }
-    return new WALEntry(type, memTableId, value);
+    return new WALInfoEntry(type, memTableId, value);
+  }
+
+  /**
+   * This deserialization method is only for multi-leader consensus and just deserializes
+   * InsertRowNode and InsertTabletNode
+   */
+  public static PlanNode deserializeInsertNode(ByteBuffer buffer) {
+    logger.debug(
+        "buffer capacity is: {}, limit is: {}, position is: {}",
+        buffer.capacity(),
+        buffer.limit(),
+        buffer.position());
+    // wal entry type
+    buffer.get();
+    // memtable id
+    buffer.getLong();
+    return PlanNodeType.deserializeFromWAL(buffer);
   }
 
   @Override
@@ -202,19 +181,5 @@ public class WALEntry implements SerializedSize {
     return walFlushListener;
   }
 
-  public boolean isSignal() {
-    return false;
-  }
-
-  private static class TabletInfo {
-    /** start row of insert tablet */
-    private final int tabletStart;
-    /** end row of insert tablet */
-    private final int tabletEnd;
-
-    public TabletInfo(int tabletStart, int tabletEnd) {
-      this.tabletStart = tabletStart;
-      this.tabletEnd = tabletEnd;
-    }
-  }
+  public abstract boolean isSignal();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryType.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryType.java
index 9a6c8eacd4..d6483db413 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryType.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryType.java
@@ -18,8 +18,9 @@
  */
 package org.apache.iotdb.db.wal.buffer;
 
-/** Type of {@link WALEntry} */
+/** Type of {@link WALEntry}, including info type and signal type */
 public enum WALEntryType {
+  // region info entry type
   /** {@link org.apache.iotdb.db.qp.physical.crud.InsertRowPlan} */
   INSERT_ROW_PLAN((byte) 0),
   /** {@link org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan} */
@@ -32,7 +33,16 @@ public enum WALEntryType {
   INSERT_ROW_NODE((byte) 4),
   /** {@link org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode} */
   INSERT_TABLET_NODE((byte) 5),
-  ;
+  // endregion
+  // region signal entry type
+  /** signal wal buffer has been closed */
+  CLOSE_SIGNAL(Byte.MIN_VALUE),
+  /** signal wal buffer to roll wal log writer */
+  ROLL_WAL_LOG_WRITER_SIGNAL((byte) (Byte.MIN_VALUE + 1)),
+  /** mark the wal file info part ends */
+  WAL_FILE_INFO_END_MARKER((byte) (Byte.MIN_VALUE + 2)),
+// endregion
+;
 
   private final byte code;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryValue.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryValue.java
index 95b3ddc20d..df9a24c308 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryValue.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryValue.java
@@ -23,8 +23,9 @@ import org.apache.iotdb.db.utils.SerializedSize;
 /** A class implements this interface can be written into .wal file. */
 public interface WALEntryValue extends SerializedSize {
   /**
-   * Serialize using {@link IWALByteBufferView}, which encapsulates some actions to deal with {@link
-   * java.nio.BufferOverflowException} occurs in {@link java.nio.ByteBuffer}.
+   * Serialize using {@link org.apache.iotdb.db.wal.buffer.IWALByteBufferView}, which encapsulates
+   * some actions to deal with {@link java.nio.BufferOverflowException} occurs in {@link
+   * java.nio.ByteBuffer}.
    */
   void serializeToWAL(IWALByteBufferView buffer);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALInfoEntry.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALInfoEntry.java
new file mode 100644
index 0000000000..58635400b7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALInfoEntry.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.wal.utils.WALMode;
+
+/** This entry class stores info for persistence */
+public class WALInfoEntry extends WALEntry {
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  /** wal entry type 1 byte, memTable id 8 bytes */
+  public static final int FIXED_SERIALIZED_SIZE = Byte.BYTES + Long.BYTES;
+
+  /** extra info for InsertTabletPlan type value */
+  private TabletInfo tabletInfo;
+
+  public WALInfoEntry(long memTableId, WALEntryValue value, boolean wait) {
+    super(memTableId, value, wait);
+  }
+
+  public WALInfoEntry(long memTableId, WALEntryValue value) {
+    this(memTableId, value, config.getWalMode() == WALMode.SYNC);
+    if (value instanceof InsertTabletPlan) {
+      tabletInfo = new TabletInfo(0, ((InsertTabletPlan) value).getRowCount());
+    } else if (value instanceof InsertTabletNode) {
+      tabletInfo = new TabletInfo(0, ((InsertTabletNode) value).getRowCount());
+    }
+  }
+
+  public WALInfoEntry(long memTableId, InsertTabletPlan value, int tabletStart, int tabletEnd) {
+    this(memTableId, value, config.getWalMode() == WALMode.SYNC);
+    tabletInfo = new TabletInfo(tabletStart, tabletEnd);
+  }
+
+  public WALInfoEntry(long memTableId, InsertTabletNode value, int tabletStart, int tabletEnd) {
+    this(memTableId, value, config.getWalMode() == WALMode.SYNC);
+    tabletInfo = new TabletInfo(tabletStart, tabletEnd);
+  }
+
+  WALInfoEntry(WALEntryType type, long memTableId, WALEntryValue value) {
+    super(type, memTableId, value, false);
+  }
+
+  @Override
+  public int serializedSize() {
+    return FIXED_SERIALIZED_SIZE + (value == null ? 0 : value.serializedSize());
+  }
+
+  @Override
+  public void serialize(IWALByteBufferView buffer) {
+    buffer.put(type.getCode());
+    buffer.putLong(memTableId);
+    switch (type) {
+      case INSERT_TABLET_PLAN:
+        ((InsertTabletPlan) value)
+            .serializeToWAL(buffer, tabletInfo.tabletStart, tabletInfo.tabletEnd);
+        break;
+      case INSERT_TABLET_NODE:
+        ((InsertTabletNode) value)
+            .serializeToWAL(buffer, tabletInfo.tabletStart, tabletInfo.tabletEnd);
+        break;
+      case INSERT_ROW_PLAN:
+      case INSERT_ROW_NODE:
+      case DELETE_PLAN:
+      case MEMORY_TABLE_SNAPSHOT:
+        value.serializeToWAL(buffer);
+        break;
+    }
+  }
+
+  private static class TabletInfo {
+    /** start row of insert tablet */
+    private final int tabletStart;
+    /** end row of insert tablet */
+    private final int tabletEnd;
+
+    public TabletInfo(int tabletStart, int tabletEnd) {
+      this.tabletStart = tabletStart;
+      this.tabletEnd = tabletEnd;
+    }
+  }
+
+  @Override
+  public boolean isSignal() {
+    return false;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/SignalWALEntry.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALSignalEntry.java
similarity index 51%
rename from server/src/main/java/org/apache/iotdb/db/wal/buffer/SignalWALEntry.java
rename to server/src/main/java/org/apache/iotdb/db/wal/buffer/WALSignalEntry.java
index d2e671d587..854c2692a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/SignalWALEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALSignalEntry.java
@@ -18,34 +18,45 @@
  */
 package org.apache.iotdb.db.wal.buffer;
 
-import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import java.nio.ByteBuffer;
 
-/** This class provides a signal to help wal buffer dealing with some special cases */
-public class SignalWALEntry extends WALEntry {
-  private final SignalType signalType;
-
-  public SignalWALEntry(SignalType signalType) {
+/** This entry class provides a signal to help wal buffer dealing with some special cases */
+public class WALSignalEntry extends WALEntry {
+  public WALSignalEntry(WALEntryType signalType) {
     this(signalType, false);
   }
 
-  public SignalWALEntry(SignalType signalType, boolean wait) {
-    super(Long.MIN_VALUE, new DeletePlan(), wait);
-    this.signalType = signalType;
+  public WALSignalEntry(WALEntryType signalType, boolean wait) {
+    super(signalType, Long.MIN_VALUE, null, wait);
+    switch (signalType) {
+      case INSERT_TABLET_PLAN:
+      case INSERT_TABLET_NODE:
+      case INSERT_ROW_PLAN:
+      case INSERT_ROW_NODE:
+      case DELETE_PLAN:
+      case MEMORY_TABLE_SNAPSHOT:
+        throw new RuntimeException("Cannot use wal info type as wal signal type");
+      default:
+        break;
+    }
   }
 
   @Override
-  public boolean isSignal() {
-    return true;
+  public int serializedSize() {
+    return Byte.BYTES;
   }
 
-  public SignalType getSignalType() {
-    return signalType;
+  @Override
+  public void serialize(IWALByteBufferView buffer) {
+    buffer.put(type.getCode());
   }
 
-  public enum SignalType {
-    /** signal wal buffer has been closed */
-    CLOSE_SIGNAL,
-    /** signal wal buffer to roll wal log writer */
-    ROLL_WAL_LOG_WRITER_SIGNAL,
+  public void serialize(ByteBuffer buffer) {
+    buffer.put(type.getCode());
+  }
+
+  @Override
+  public boolean isSignal() {
+    return true;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/LogWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/io/LogWriter.java
index 635022d28d..34815c1564 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/LogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/LogWriter.java
@@ -39,11 +39,10 @@ import java.nio.channels.FileChannel;
 public abstract class LogWriter implements ILogWriter {
   private static final Logger logger = LoggerFactory.getLogger(LogWriter.class);
 
-  private final File logFile;
-  private final FileOutputStream logStream;
-  private final FileChannel logChannel;
-
-  private long size;
+  protected final File logFile;
+  protected final FileOutputStream logStream;
+  protected final FileChannel logChannel;
+  protected long size;
 
   public LogWriter(File logFile) throws FileNotFoundException {
     this.logFile = logFile;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java
new file mode 100644
index 0000000000..a993fe7c57
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.io;
+
+import org.apache.iotdb.db.wal.buffer.WALEntry;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.Iterator;
+
+import static org.apache.iotdb.db.wal.io.WALWriter.MAGIC_STRING;
+import static org.apache.iotdb.db.wal.io.WALWriter.MAGIC_STRING_BYTES;
+
+/**
+ * This reader returns {@link WALEntry} as {@link ByteBuffer}, the usage of WALByteBufReader is like
+ * {@link Iterator}.
+ */
+public class WALByteBufReader implements Closeable {
+  private final File logFile;
+  private final FileChannel channel;
+  private final WALMetaData metaData;
+  private final Iterator<Integer> sizeIterator;
+
+  public WALByteBufReader(File logFile) throws IOException {
+    this.logFile = logFile;
+    this.channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ);
+    if (!readTailMagic().equals(MAGIC_STRING)) {
+      throw new IOException(String.format("Broken wal file %s", logFile));
+    }
+    // load metadata size
+    ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
+    long position = channel.size() - MAGIC_STRING_BYTES - Integer.BYTES;
+    channel.read(metadataSizeBuf, position);
+    metadataSizeBuf.flip();
+    // load metadata
+    int metadataSize = metadataSizeBuf.getInt();
+    ByteBuffer metadataBuf = ByteBuffer.allocate(metadataSize);
+    channel.read(metadataBuf, position - metadataSize);
+    metadataBuf.flip();
+    metaData = WALMetaData.deserialize(metadataBuf);
+    // init iterator
+    sizeIterator = metaData.getBuffersSize().iterator();
+    channel.position(0);
+  }
+
+  /** Like {@link Iterator#hasNext()} */
+  public boolean hasNext() {
+    return sizeIterator.hasNext();
+  }
+
+  /** Like {@link Iterator#next()} */
+  public ByteBuffer next() throws IOException {
+    int size = sizeIterator.next();
+    ByteBuffer buffer = ByteBuffer.allocate(size);
+    channel.read(buffer);
+    buffer.clear();
+    return buffer;
+  }
+
+  private String readTailMagic() throws IOException {
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(MAGIC_STRING_BYTES);
+    channel.read(magicStringBytes, channel.size() - MAGIC_STRING_BYTES);
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  @Override
+  public void close() throws IOException {
+    channel.close();
+  }
+
+  public long getFirstSearchIndex() {
+    return metaData.getFirstSearchIndex();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALMetaData.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALMetaData.java
new file mode 100644
index 0000000000..35bbd5e67d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALMetaData.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.io;
+
+import org.apache.iotdb.db.utils.SerializedSize;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iotdb.db.wal.node.WALNode.DEFAULT_SEARCH_INDEX;
+
+/**
+ * Metadata exists at the end of each wal file, including each entry's size, search index of first
+ * entry and the number of entries.
+ */
+public class WALMetaData implements SerializedSize {
+  /** search index 8 byte, wal entries' number 4 bytes */
+  private static final int FIXED_SERIALIZED_SIZE = Long.BYTES + Integer.BYTES;
+
+  /** search index of first entry */
+  private long firstSearchIndex;
+
+  /** each entry's size */
+  private final List<Integer> buffersSize;
+
+  public WALMetaData() {
+    this(DEFAULT_SEARCH_INDEX, new ArrayList<>());
+  }
+
+  public WALMetaData(long firstSearchIndex, List<Integer> buffersSize) {
+    this.firstSearchIndex = firstSearchIndex;
+    this.buffersSize = buffersSize;
+  }
+
+  public void add(int size, long searchIndex) {
+    if (buffersSize.isEmpty()) {
+      firstSearchIndex = searchIndex;
+    }
+    buffersSize.add(size);
+  }
+
+  public void addAll(WALMetaData metaData) {
+    if (buffersSize.isEmpty()) {
+      firstSearchIndex = metaData.getFirstSearchIndex();
+    }
+    buffersSize.addAll(metaData.getBuffersSize());
+  }
+
+  @Override
+  public int serializedSize() {
+    return FIXED_SERIALIZED_SIZE + buffersSize.size() * Integer.BYTES;
+  }
+
+  public void serialize(ByteBuffer buffer) {
+    buffer.putLong(firstSearchIndex);
+    buffer.putInt(buffersSize.size());
+    for (int size : buffersSize) {
+      buffer.putInt(size);
+    }
+  }
+
+  public static WALMetaData deserialize(ByteBuffer buffer) {
+    long firstSearchIndex = buffer.getLong();
+    int entriesNum = buffer.getInt();
+    List<Integer> buffersSize = new ArrayList<>(entriesNum);
+    for (int i = 0; i < entriesNum; ++i) {
+      buffersSize.add(buffer.getInt());
+    }
+    return new WALMetaData(firstSearchIndex, buffersSize);
+  }
+
+  public List<Integer> getBuffersSize() {
+    return buffersSize;
+  }
+
+  public long getFirstSearchIndex() {
+    return firstSearchIndex;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALReader.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALReader.java
index 41a0cf145d..24fefc4c0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALReader.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.wal.io;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALEntryType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,46 +29,36 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedInputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
-import java.io.EOFException;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.NoSuchElementException;
 
 /**
- * The usage of WALReader is like {@link Iterator}, which aims to control the memory usage of
- * reader.
+ * This reader returns {@link WALEntry} directly, the usage of WALReader is like {@link Iterator}.
  */
 public class WALReader implements Closeable {
   private static final Logger logger = LoggerFactory.getLogger(WALReader.class);
   /** 1/10 of .wal file size as buffer size */
   private static final int STREAM_BUFFER_SIZE =
       (int) IoTDBDescriptor.getInstance().getConfig().getWalFileSizeThresholdInByte() / 10;
-  /** 1000 as default batch limit */
-  private static final int BATCH_LIMIT = 1_000;
 
   private final File logFile;
   private final DataInputStream logStream;
-  private final List<WALEntry> walEntries;
-
-  private Iterator<WALEntry> itr = null;
+  private WALEntry nextEntry;
   private boolean fileCorrupted = false;
 
-  public WALReader(File logFile) throws FileNotFoundException {
+  public WALReader(File logFile) throws IOException {
     this.logFile = logFile;
     this.logStream =
         new DataInputStream(
-            new BufferedInputStream(new FileInputStream(logFile), STREAM_BUFFER_SIZE));
-    this.walEntries = new LinkedList<>();
+            new BufferedInputStream(Files.newInputStream(logFile.toPath()), STREAM_BUFFER_SIZE));
   }
 
   /** Like {@link Iterator#hasNext()} */
   public boolean hasNext() {
-    if (itr != null && itr.hasNext()) {
+    if (nextEntry != null) {
       return true;
     }
     // read WALEntries from log stream
@@ -75,14 +66,11 @@ public class WALReader implements Closeable {
       if (fileCorrupted) {
         return false;
       }
-      walEntries.clear();
-      while (walEntries.size() < BATCH_LIMIT) {
-        WALEntry walEntry = WALEntry.deserialize(logStream);
-        walEntries.add(walEntry);
+      nextEntry = WALEntry.deserialize(logStream);
+      if (nextEntry.getType() == WALEntryType.WAL_FILE_INFO_END_MARKER) {
+        nextEntry = null;
+        return false;
       }
-    } catch (EOFException e) {
-      // reach end of wal file
-      fileCorrupted = true;
     } catch (IllegalPathException e) {
       fileCorrupted = true;
       logger.warn(
@@ -92,19 +80,17 @@ public class WALReader implements Closeable {
       logger.warn("Fail to read WALEntry from wal file {}, skip broken WALEntries.", logFile, e);
     }
 
-    if (walEntries.size() != 0) {
-      itr = walEntries.iterator();
-      return true;
-    }
-    return false;
+    return nextEntry != null;
   }
 
   /** Like {@link Iterator#next()} */
   public WALEntry next() {
-    if (itr == null) {
+    if (nextEntry == null) {
       throw new NoSuchElementException();
     }
-    return itr.next();
+    WALEntry next = nextEntry;
+    nextEntry = null;
+    return next;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
index 5ce2bcb251..b0f809eab5 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
@@ -19,19 +19,63 @@
 package org.apache.iotdb.db.wal.io;
 
 import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALEntryType;
+import org.apache.iotdb.db.wal.buffer.WALSignalEntry;
 import org.apache.iotdb.db.wal.utils.WALFileStatus;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 
 /** WALWriter writes the binary {@link WALEntry} into .wal file. */
 public class WALWriter extends LogWriter {
+  public static final String MAGIC_STRING = "WAL";
+  public static final int MAGIC_STRING_BYTES = MAGIC_STRING.getBytes().length;
+
   private WALFileStatus walFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
 
+  /** wal files' metadata */
+  protected final WALMetaData metaData = new WALMetaData();
+
   public WALWriter(File logFile) throws FileNotFoundException {
     super(logFile);
   }
 
+  /** Writes buffer and update its' metadata */
+  public void write(ByteBuffer buffer, WALMetaData metaData) throws IOException {
+    // update metadata
+    updateMetaData(metaData);
+    // flush buffer
+    write(buffer);
+  }
+
+  public void updateMetaData(WALMetaData metaData) {
+    this.metaData.addAll(metaData);
+  }
+
+  private void endFile() throws IOException {
+    WALSignalEntry endMarker = new WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER);
+    int metaDataSize = metaData.serializedSize();
+    ByteBuffer buffer =
+        ByteBuffer.allocate(
+            endMarker.serializedSize() + metaDataSize + Integer.BYTES + MAGIC_STRING_BYTES);
+    // mark info part ends
+    endMarker.serialize(buffer);
+    // flush meta data
+    metaData.serialize(buffer);
+    buffer.putInt(metaDataSize);
+    // add magic string
+    buffer.put(MAGIC_STRING.getBytes());
+    write(buffer);
+  }
+
+  @Override
+  public void close() throws IOException {
+    endFile();
+    super.close();
+  }
+
   public void updateFileStatus(WALFileStatus walFileStatus) {
     if (walFileStatus == WALFileStatus.CONTAINS_SEARCH_INDEX) {
       this.walFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
index 22030be9e8..662465359e 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.wal.node;
 
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
@@ -28,8 +27,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.wal.exception.WALException;
 import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
 
-import java.util.List;
-
 /** This class provides fake wal node when wal is disabled or exception happens. */
 public class WALFakeNode implements IWALNode {
   private final WALFlushListener.Status status;
@@ -104,16 +101,6 @@ public class WALFakeNode implements IWALNode {
     throw new UnsupportedOperationException();
   }
 
-  @Override
-  public IConsensusRequest getReq(long index) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public List<IConsensusRequest> getReqs(long startIndex, int num) {
-    throw new UnsupportedOperationException();
-  }
-
   @Override
   public ReqIterator getReqIterator(long startIndex) {
     throw new UnsupportedOperationException();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 3e50041e73..f3eb5c2271 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -34,24 +35,21 @@ import org.apache.iotdb.db.engine.flush.FlushStatus;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.wal.buffer.IWALBuffer;
-import org.apache.iotdb.db.wal.buffer.SignalWALEntry;
 import org.apache.iotdb.db.wal.buffer.WALBuffer;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
 import org.apache.iotdb.db.wal.buffer.WALEntryType;
+import org.apache.iotdb.db.wal.buffer.WALInfoEntry;
+import org.apache.iotdb.db.wal.buffer.WALSignalEntry;
 import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
 import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
-import org.apache.iotdb.db.wal.io.WALReader;
+import org.apache.iotdb.db.wal.io.WALByteBufReader;
 import org.apache.iotdb.db.wal.utils.WALFileStatus;
 import org.apache.iotdb.db.wal.utils.WALFileUtils;
 import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
@@ -63,6 +61,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -129,33 +128,33 @@ public class WALNode implements IWALNode {
 
   @Override
   public WALFlushListener log(long memTableId, InsertRowPlan insertRowPlan) {
-    WALEntry walEntry = new WALEntry(memTableId, insertRowPlan);
+    WALEntry walEntry = new WALInfoEntry(memTableId, insertRowPlan);
     return log(walEntry);
   }
 
   @Override
   public WALFlushListener log(long memTableId, InsertRowNode insertRowNode) {
-    WALEntry walEntry = new WALEntry(memTableId, insertRowNode);
+    WALEntry walEntry = new WALInfoEntry(memTableId, insertRowNode);
     return log(walEntry);
   }
 
   @Override
   public WALFlushListener log(
       long memTableId, InsertTabletPlan insertTabletPlan, int start, int end) {
-    WALEntry walEntry = new WALEntry(memTableId, insertTabletPlan, start, end);
+    WALEntry walEntry = new WALInfoEntry(memTableId, insertTabletPlan, start, end);
     return log(walEntry);
   }
 
   @Override
   public WALFlushListener log(
       long memTableId, InsertTabletNode insertTabletNode, int start, int end) {
-    WALEntry walEntry = new WALEntry(memTableId, insertTabletNode, start, end);
+    WALEntry walEntry = new WALInfoEntry(memTableId, insertTabletNode, start, end);
     return log(walEntry);
   }
 
   @Override
   public WALFlushListener log(long memTableId, DeletePlan deletePlan) {
-    WALEntry walEntry = new WALEntry(memTableId, deletePlan);
+    WALEntry walEntry = new WALInfoEntry(memTableId, deletePlan);
     return log(walEntry);
   }
 
@@ -421,7 +420,7 @@ public class WALNode implements IWALNode {
       memTableSnapshotCount.compute(memTable.getMemTableId(), (k, v) -> v == null ? 1 : v + 1);
       // roll wal log writer to make sure first version id will be updated
       WALEntry rollWALFileSignal =
-          new SignalWALEntry(SignalWALEntry.SignalType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
+          new WALSignalEntry(WALEntryType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
       WALFlushListener fileRolledListener = log(rollWALFileSignal);
       if (fileRolledListener.waitForResult() == WALFlushListener.Status.FAILURE) {
         logger.error("Fail to roll wal log writer.", fileRolledListener.getCause());
@@ -437,7 +436,7 @@ public class WALNode implements IWALNode {
           "CheckpointManager$DeleteOutdatedFileTask.snapshotOrFlushOldestMemTable");
       try {
         // log snapshot in a new .wal file
-        WALEntry walEntry = new WALEntry(memTable.getMemTableId(), memTable, true);
+        WALEntry walEntry = new WALInfoEntry(memTable.getMemTableId(), memTable, true);
         WALFlushListener flushListener = log(walEntry);
 
         // wait until getting the result
@@ -463,193 +462,7 @@ public class WALNode implements IWALNode {
     this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
   }
 
-  /**
-   * Merge insert nodes sharing same search index ( e.g. tablet-100, tablet-100, tablet-100 will be
-   * merged to one multi-tablet). <br>
-   * Notice: the continuity of insert nodes sharing same search index should be protected by the
-   * upper layer.
-   */
-  private static InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
-    int size = insertNodes.size();
-    if (size == 0) {
-      return null;
-    }
-    if (size == 1) {
-      InsertNode insertNode = insertNodes.get(0);
-      insertNode.setPlanNodeId(new PlanNodeId(""));
-      return insertNode;
-    }
-
-    InsertNode result;
-    if (insertNodes.get(0) instanceof InsertTabletNode) { // merge to InsertMultiTabletsNode
-      List<Integer> index = new ArrayList<>(size);
-      List<InsertTabletNode> insertTabletNodes = new ArrayList<>(size);
-      int i = 0;
-      for (InsertNode insertNode : insertNodes) {
-        insertTabletNodes.add((InsertTabletNode) insertNode);
-        index.add(i);
-        i++;
-      }
-      result = new InsertMultiTabletsNode(new PlanNodeId(""), index, insertTabletNodes);
-    } else { // merge to InsertRowsNode or InsertRowsOfOneDeviceNode
-      boolean sameDevice = true;
-      PartialPath device = insertNodes.get(0).getDevicePath();
-      List<Integer> index = new ArrayList<>(size);
-      List<InsertRowNode> insertRowNodes = new ArrayList<>(size);
-      int i = 0;
-      for (InsertNode insertNode : insertNodes) {
-        if (sameDevice && !insertNode.getDevicePath().equals(device)) {
-          sameDevice = false;
-        }
-        insertRowNodes.add((InsertRowNode) insertNode);
-        index.add(i);
-        i++;
-      }
-      result =
-          sameDevice
-              ? new InsertRowsOfOneDeviceNode(new PlanNodeId(""), index, insertRowNodes)
-              : new InsertRowsNode(new PlanNodeId(""), index, insertRowNodes);
-    }
-    result.setSearchIndex(insertNodes.get(0).getSearchIndex());
-    result.setDevicePath(insertNodes.get(0).getDevicePath());
-    return result;
-  }
-
-  @Override
-  public IConsensusRequest getReq(long index) {
-    // find file
-    File[] currentFiles = WALFileUtils.listAllWALFiles(logDirectory);
-    WALFileUtils.ascSortByVersionId(currentFiles);
-    int fileIndex = WALFileUtils.binarySearchFileBySearchIndex(currentFiles, index);
-    if (fileIndex < 0) {
-      return null;
-    }
-    // find log
-    List<InsertNode> tmpNodes = new ArrayList<>();
-    for (int i = fileIndex; i < currentFiles.length; i++) {
-      // cannot find anymore
-      if (index < WALFileUtils.parseStartSearchIndex(currentFiles[i].getName())) {
-        if (!tmpNodes.isEmpty()) {
-          return mergeInsertNodes(tmpNodes);
-        } else {
-          break;
-        }
-      }
-      // cannot find any in this file
-      if (WALFileUtils.parseStatusCode(currentFiles[i].getName())
-          == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
-        if (!tmpNodes.isEmpty()) {
-          return mergeInsertNodes(tmpNodes);
-        } else {
-          continue;
-        }
-      }
-
-      try (WALReader walReader = new WALReader(currentFiles[i])) {
-        while (walReader.hasNext()) {
-          WALEntry walEntry = walReader.next();
-          if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
-              || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
-            InsertNode insertNode = (InsertNode) walEntry.getValue();
-            if (insertNode.getSearchIndex() == index) {
-              tmpNodes.add(insertNode);
-            } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
-              return mergeInsertNodes(tmpNodes);
-            }
-          } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
-            return mergeInsertNodes(tmpNodes);
-          }
-        }
-      } catch (FileNotFoundException e) {
-        logger.debug(
-            "WAL file {} has been deleted, try to call getReq({}) again.", currentFiles[i], index);
-        return getReq(index);
-      } catch (Exception e) {
-        logger.error("Fail to read wal from wal file {}", currentFiles[i], e);
-      }
-    }
-    // not find or not complete
-    return null;
-  }
-
-  @Override
-  public List<IConsensusRequest> getReqs(long startIndex, int num) {
-    List<IConsensusRequest> result = new ArrayList<>(num);
-    // find file
-    File[] currentFiles = WALFileUtils.listAllWALFiles(logDirectory);
-    WALFileUtils.ascSortByVersionId(currentFiles);
-    int fileIndex = WALFileUtils.binarySearchFileBySearchIndex(currentFiles, startIndex);
-    if (fileIndex < 0) {
-      return result;
-    }
-    // find logs
-    long endIndex = startIndex + num - 1;
-    long targetIndex = startIndex;
-    List<InsertNode> tmpNodes = new ArrayList<>();
-    for (int i = fileIndex; i < currentFiles.length; i++) {
-      // cannot find anymore
-      if (endIndex < WALFileUtils.parseStartSearchIndex(currentFiles[i].getName())) {
-        if (!tmpNodes.isEmpty()) {
-          result.add(mergeInsertNodes(tmpNodes));
-        } else {
-          break;
-        }
-      }
-      // cannot find any in this file
-      if (WALFileUtils.parseStatusCode(currentFiles[i].getName())
-          == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
-        if (!tmpNodes.isEmpty()) {
-          result.add(mergeInsertNodes(tmpNodes));
-        } else {
-          continue;
-        }
-      }
-
-      try (WALReader walReader = new WALReader(currentFiles[i])) {
-        while (walReader.hasNext()) {
-          WALEntry walEntry = walReader.next();
-          if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
-              || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
-            InsertNode insertNode = (InsertNode) walEntry.getValue();
-            if (insertNode.getSearchIndex() == targetIndex) {
-              tmpNodes.add(insertNode);
-            } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
-              result.add(mergeInsertNodes(tmpNodes));
-              if (result.size() == num) {
-                return result;
-              }
-              targetIndex++;
-              tmpNodes = new ArrayList<>();
-              // remember to add current insert node
-              if (insertNode.getSearchIndex() == targetIndex) {
-                tmpNodes.add(insertNode);
-              }
-            }
-          } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
-            result.add(mergeInsertNodes(tmpNodes));
-            if (result.size() == num) {
-              return result;
-            }
-            targetIndex++;
-            tmpNodes = new ArrayList<>();
-          }
-        }
-      } catch (FileNotFoundException e) {
-        logger.debug(
-            "WAL file {} has been deleted, try to call getReqs({}, {}) again.",
-            currentFiles[i],
-            startIndex,
-            num);
-        return getReqs(startIndex, num);
-      } catch (Exception e) {
-        logger.error("Fail to read wal from wal file {}", currentFiles[i], e);
-      }
-    }
-
-    return result;
-  }
-
-  /** This iterator is not concurrency-safe */
+  /** This iterator is not concurrency-safe, cannot read the current-writing wal file. */
   @Override
   public ReqIterator getReqIterator(long startIndex) {
     return new PlanNodeIterator(startIndex);
@@ -665,9 +478,9 @@ public class WALNode implements IWALNode {
     /** true means filesToSearch and currentFileIndex are outdated, call updateFilesToSearch */
     private boolean needUpdatingFilesToSearch = true;
     /** batch store insert nodes */
-    private final List<InsertNode> insertNodes = new LinkedList<>();
+    private final List<IndexedConsensusRequest> insertNodes = new LinkedList<>();
     /** iterator of insertNodes */
-    private Iterator<InsertNode> itr = null;
+    private Iterator<IndexedConsensusRequest> itr = null;
 
     public PlanNodeIterator(long startIndex) {
       this.nextSearchIndex = startIndex;
@@ -697,7 +510,7 @@ public class WALNode implements IWALNode {
       while (WALFileUtils.parseStatusCode(filesToSearch[currentFileIndex].getName())
           == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
         currentFileIndex++;
-        if (currentFileIndex >= filesToSearch.length) {
+        if (currentFileIndex >= filesToSearch.length - 1) {
           needUpdatingFilesToSearch = true;
           return false;
         }
@@ -707,34 +520,38 @@ public class WALNode implements IWALNode {
       while (WALFileUtils.parseStatusCode(filesToSearch[currentFileIndex].getName())
           == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
         currentFileIndex++;
-        if (currentFileIndex >= filesToSearch.length) {
+        if (currentFileIndex >= filesToSearch.length - 1) {
           needUpdatingFilesToSearch = true;
           return false;
         }
       }
 
-      // find all insert plan of current wal file
-      List<InsertNode> tmpNodes = new ArrayList<>();
+      // find all insert node of current wal file
+      List<IConsensusRequest> tmpNodes = new ArrayList<>();
       long targetIndex = nextSearchIndex;
-      try (WALReader walReader = new WALReader(filesToSearch[currentFileIndex])) {
-        while (walReader.hasNext()) {
-          WALEntry walEntry = walReader.next();
-          if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
-              || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
-            InsertNode insertNode = (InsertNode) walEntry.getValue();
-            if (insertNode.getSearchIndex() == targetIndex) {
-              tmpNodes.add(insertNode);
+      try (WALByteBufReader walByteBufReader =
+          new WALByteBufReader(filesToSearch[currentFileIndex])) {
+        while (walByteBufReader.hasNext()) {
+          ByteBuffer buffer = walByteBufReader.next();
+          WALEntryType type = WALEntryType.valueOf(buffer.get());
+          if (type == WALEntryType.INSERT_TABLET_NODE || type == WALEntryType.INSERT_ROW_NODE) {
+            // see WALInfoEntry#serialize, entry type + memtable id + insert node type
+            buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + PlanNodeType.BYTES);
+            long searchIndex = buffer.getLong();
+            buffer.clear();
+            if (searchIndex == targetIndex) {
+              tmpNodes.add(new MultiLeaderConsensusRequest(buffer));
             } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
-              insertNodes.add(mergeInsertNodes(tmpNodes));
+              insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
               targetIndex++;
               tmpNodes = new ArrayList<>();
               // remember to add current insert node
-              if (insertNode.getSearchIndex() == targetIndex) {
-                tmpNodes.add(insertNode);
+              if (searchIndex == targetIndex) {
+                tmpNodes.add(new MultiLeaderConsensusRequest(buffer));
               }
             }
           } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
-            insertNodes.add(mergeInsertNodes(tmpNodes));
+            insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
             targetIndex++;
             tmpNodes = new ArrayList<>();
           }
@@ -755,32 +572,42 @@ public class WALNode implements IWALNode {
         currentFileIndex++;
       } else {
         int fileIndex = currentFileIndex + 1;
-        while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length) {
+        while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length - 1) {
           // cannot find any in this file, find all slices of last insert plan
           if (WALFileUtils.parseStatusCode(filesToSearch[fileIndex].getName())
               == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
-            insertNodes.add(mergeInsertNodes(tmpNodes));
+            insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
             tmpNodes = Collections.emptyList();
             break;
           }
-
-          try (WALReader walReader = new WALReader(filesToSearch[fileIndex])) {
-            while (walReader.hasNext()) {
-              WALEntry walEntry = walReader.next();
-              if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
-                  || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
-                InsertNode insertNode = (InsertNode) walEntry.getValue();
-                if (insertNode.getSearchIndex() == targetIndex) {
-                  tmpNodes.add(insertNode);
+          // read until find all insert node whose search index equals target index
+          try (WALByteBufReader walByteBufReader = new WALByteBufReader(filesToSearch[fileIndex])) {
+            if (walByteBufReader.getFirstSearchIndex() != targetIndex) {
+              insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
+              tmpNodes = Collections.emptyList();
+              break;
+            } else {
+              while (walByteBufReader.hasNext()) {
+                ByteBuffer buffer = walByteBufReader.next();
+                WALEntryType type = WALEntryType.valueOf(buffer.get());
+                if (type == WALEntryType.INSERT_TABLET_NODE
+                    || type == WALEntryType.INSERT_ROW_NODE) {
+                  // see WALInfoEntry#serialize, entry type + memtable id + insert node type
+                  buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + PlanNodeType.BYTES);
+                  long searchIndex = buffer.getLong();
+                  buffer.clear();
+                  if (searchIndex == targetIndex) {
+                    tmpNodes.add(new MultiLeaderConsensusRequest(buffer));
+                  } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+                    insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
+                    tmpNodes = Collections.emptyList();
+                    break;
+                  }
                 } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
-                  insertNodes.add(mergeInsertNodes(tmpNodes));
+                  insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
                   tmpNodes = Collections.emptyList();
                   break;
                 }
-              } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
-                insertNodes.add(mergeInsertNodes(tmpNodes));
-                tmpNodes = Collections.emptyList();
-                break;
               }
             }
           } catch (FileNotFoundException e) {
@@ -806,7 +633,7 @@ public class WALNode implements IWALNode {
       }
 
       // update file index and version id
-      if (currentFileIndex >= filesToSearch.length) {
+      if (currentFileIndex >= filesToSearch.length - 1) {
         needUpdatingFilesToSearch = true;
       }
 
@@ -824,27 +651,27 @@ public class WALNode implements IWALNode {
         throw new NoSuchElementException();
       }
 
-      InsertNode insertNode = itr.next();
-      if (insertNode.getSearchIndex() == nextSearchIndex) {
+      IndexedConsensusRequest request = itr.next();
+      if (request.getSearchIndex() == nextSearchIndex) {
         nextSearchIndex++;
-      } else if (insertNode.getSearchIndex() > nextSearchIndex) {
+      } else if (request.getSearchIndex() > nextSearchIndex) {
         logger.warn(
             "Search index of wal node-{} are not continuously, skip from {} to {}.",
             identifier,
             nextSearchIndex,
-            insertNode.getSearchIndex());
-        skipTo(insertNode.getSearchIndex() + 1);
+            request.getSearchIndex());
+        skipTo(request.getSearchIndex() + 1);
       } else {
         logger.error(
             "Search index of wal node-{} are out of order, {} is before {}.",
             identifier,
             nextSearchIndex,
-            insertNode.getSearchIndex());
+            request.getSearchIndex());
         throw new RuntimeException(
             String.format("Search index of wal node-%s are out of order", identifier));
       }
 
-      return new IndexedConsensusRequest(insertNode.getSearchIndex(), -1, insertNode);
+      return request;
     }
 
     @Override
@@ -893,7 +720,8 @@ public class WALNode implements IWALNode {
       int fileIndex = WALFileUtils.binarySearchFileBySearchIndex(filesToSearch, nextSearchIndex);
       logger.debug(
           "searchIndex: {}, result: {}, files: {}, ", nextSearchIndex, fileIndex, filesToSearch);
-      if (filesToSearch != null && fileIndex >= 0) { // possible to find next
+      if (filesToSearch != null
+          && (fileIndex >= 0 && fileIndex < filesToSearch.length - 1)) { // possible to find next
         this.filesToSearch = filesToSearch;
         this.currentFileIndex = fileIndex;
         this.needUpdatingFilesToSearch = false;
@@ -934,8 +762,7 @@ public class WALNode implements IWALNode {
 
   @TestOnly
   public void rollWALFile() {
-    WALEntry rollWALFileSignal =
-        new SignalWALEntry(SignalWALEntry.SignalType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
+    WALEntry rollWALFileSignal = new WALSignalEntry(WALEntryType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
     WALFlushListener walFlushListener = log(rollWALFileSignal);
     if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
       logger.error(
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
index ed8042e08d..2bd4e6e114 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
 import org.apache.iotdb.db.wal.buffer.WALEntryType;
 import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
+import org.apache.iotdb.db.wal.io.WALMetaData;
 import org.apache.iotdb.db.wal.io.WALReader;
 import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
 import org.apache.iotdb.db.wal.utils.CheckpointFileUtils;
@@ -39,11 +40,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader.DEFAULT_SEARCH_INDEX;
+
 /** This task is responsible for the recovery of one wal node. */
 public class WALNodeRecoverTask implements Runnable {
   private static final Logger logger = LoggerFactory.getLogger(WALNodeRecoverTask.class);
@@ -103,7 +107,7 @@ public class WALNodeRecoverTask implements Runnable {
         checkpointFile.delete();
       }
       // recover version id and search index
-      long[] indexInfo = recoverLastSearchIndex();
+      long[] indexInfo = recoverLastFile();
       long lastVersionId = indexInfo[0];
       long lastSearchIndex = indexInfo[1];
       WALManager.getInstance()
@@ -118,7 +122,7 @@ public class WALNodeRecoverTask implements Runnable {
     }
   }
 
-  private long[] recoverLastSearchIndex() {
+  private long[] recoverLastFile() {
     File[] walFiles = WALFileUtils.listAllWALFiles(logDirectory);
     if (walFiles == null || walFiles.length == 0) {
       return new long[] {0L, 0L};
@@ -128,22 +132,33 @@ public class WALNodeRecoverTask implements Runnable {
     File lastWALFile = walFiles[walFiles.length - 1];
     long lastVersionId = WALFileUtils.parseVersionId(lastWALFile.getName());
     long lastSearchIndex = WALFileUtils.parseStartSearchIndex(lastWALFile.getName());
+    WALMetaData metaData = new WALMetaData();
     WALFileStatus fileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
     try (WALReader walReader = new WALReader(lastWALFile)) {
       while (walReader.hasNext()) {
         WALEntry walEntry = walReader.next();
+        long searchIndex = DEFAULT_SEARCH_INDEX;
         if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
             || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
           InsertNode insertNode = (InsertNode) walEntry.getValue();
           if (insertNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) {
+            searchIndex = insertNode.getSearchIndex();
             lastSearchIndex = Math.max(lastSearchIndex, insertNode.getSearchIndex());
             fileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
           }
         }
+        metaData.add(walEntry.serializedSize(), searchIndex);
       }
     } catch (Exception e) {
       logger.warn("Fail to read wal logs from {}, skip them", lastWALFile, e);
     }
+    // make sure last wal file is correct
+    WALRecoverWriter walRecoverWriter = new WALRecoverWriter(lastWALFile);
+    try {
+      walRecoverWriter.recover(metaData);
+    } catch (IOException e) {
+      logger.error("Fail to recover metadata of wal file {}", lastWALFile);
+    }
     // rename last wal file when file status are inconsistent
     if (WALFileUtils.parseStatusCode(lastWALFile.getName()) != fileStatus) {
       String targetName =
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverWriter.java
new file mode 100644
index 0000000000..92591f7837
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverWriter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.recover;
+
+import org.apache.iotdb.db.wal.io.WALMetaData;
+import org.apache.iotdb.db.wal.io.WALWriter;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+import static org.apache.iotdb.db.wal.io.WALWriter.MAGIC_STRING;
+import static org.apache.iotdb.db.wal.io.WALWriter.MAGIC_STRING_BYTES;
+
+/** Check whether the wal file is broken and recover it. */
+public class WALRecoverWriter {
+  private final File logFile;
+
+  public WALRecoverWriter(File logFile) {
+    this.logFile = logFile;
+  }
+
+  public void recover(WALMetaData metaData) throws IOException {
+    if (!readTailMagic().equals(MAGIC_STRING)) {
+      // truncate broken data
+      int size = metaData.getBuffersSize().stream().mapToInt(Integer::intValue).sum();
+      try (FileChannel channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ)) {
+        channel.truncate(size);
+      }
+      // flush metadata
+      try (WALWriter walWriter = new WALWriter(logFile)) {
+        walWriter.updateMetaData(metaData);
+      }
+    }
+  }
+
+  private String readTailMagic() throws IOException {
+    try (FileChannel channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ)) {
+      ByteBuffer magicStringBytes = ByteBuffer.allocate(MAGIC_STRING_BYTES);
+      channel.read(magicStringBytes, channel.size() - MAGIC_STRING_BYTES);
+      magicStringBytes.flip();
+      return new String(magicStringBytes.array());
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java
index 12a8120013..a7eef37d48 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java
@@ -85,7 +85,7 @@ public class InsertRowNodeSerdeTest {
 
     Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), dataInputStream.readShort());
 
-    InsertRowNode tmpNode = InsertRowNode.deserialize(dataInputStream);
+    InsertRowNode tmpNode = InsertRowNode.deserializeFromWAL(dataInputStream);
     tmpNode.setPlanNodeId(insertRowNode.getPlanNodeId());
 
     Assert.assertEquals(insertRowNode, tmpNode);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
index b6604aedb3..9c761d0fbc 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
@@ -76,7 +76,7 @@ public class InsertTabletNodeSerdeTest {
 
     Assert.assertEquals(PlanNodeType.INSERT_TABLET.ordinal(), dataInputStream.readShort());
 
-    InsertTabletNode tmpNode = InsertTabletNode.deserialize(dataInputStream);
+    InsertTabletNode tmpNode = InsertTabletNode.deserializeFromWAL(dataInputStream);
     tmpNode.setPlanNodeId(insertTabletNode.getPlanNodeId());
 
     Assert.assertEquals(insertTabletNode, tmpNode);
diff --git a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
index 6e0bcedab4..c068e4d64b 100644
--- a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.exception.SystemCheckException;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALInfoEntry;
 import org.apache.iotdb.db.wal.io.ILogWriter;
 import org.apache.iotdb.db.wal.io.WALFileTest;
 import org.apache.iotdb.db.wal.io.WALWriter;
@@ -86,9 +87,10 @@ public class WalCheckerTest {
                 walNodeDir, WALFileUtils.getLogFileName(i, 0, WALFileStatus.CONTAINS_SEARCH_INDEX));
         int fakeMemTableId = 1;
         List<WALEntry> walEntries = new ArrayList<>();
-        walEntries.add(new WALEntry(fakeMemTableId, WALFileTest.getInsertRowPlan(DEVICE_ID)));
-        walEntries.add(new WALEntry(fakeMemTableId, WALFileTest.getInsertTabletPlan(DEVICE_ID)));
-        walEntries.add(new WALEntry(fakeMemTableId, WALFileTest.getDeletePlan(DEVICE_ID)));
+        walEntries.add(new WALInfoEntry(fakeMemTableId, WALFileTest.getInsertRowPlan(DEVICE_ID)));
+        walEntries.add(
+            new WALInfoEntry(fakeMemTableId, WALFileTest.getInsertTabletPlan(DEVICE_ID)));
+        walEntries.add(new WALInfoEntry(fakeMemTableId, WALFileTest.getDeletePlan(DEVICE_ID)));
         int size = 0;
         for (WALEntry walEntry : walEntries) {
           size += walEntry.serializedSize();
@@ -124,9 +126,10 @@ public class WalCheckerTest {
                 walNodeDir, WALFileUtils.getLogFileName(i, 0, WALFileStatus.CONTAINS_SEARCH_INDEX));
         int fakeMemTableId = 1;
         List<WALEntry> walEntries = new ArrayList<>();
-        walEntries.add(new WALEntry(fakeMemTableId, WALFileTest.getInsertRowPlan(DEVICE_ID)));
-        walEntries.add(new WALEntry(fakeMemTableId, WALFileTest.getInsertTabletPlan(DEVICE_ID)));
-        walEntries.add(new WALEntry(fakeMemTableId, WALFileTest.getDeletePlan(DEVICE_ID)));
+        walEntries.add(new WALInfoEntry(fakeMemTableId, WALFileTest.getInsertRowPlan(DEVICE_ID)));
+        walEntries.add(
+            new WALInfoEntry(fakeMemTableId, WALFileTest.getInsertTabletPlan(DEVICE_ID)));
+        walEntries.add(new WALInfoEntry(fakeMemTableId, WALFileTest.getDeletePlan(DEVICE_ID)));
         int size = 0;
         for (WALEntry walEntry : walEntries) {
           size += walEntry.serializedSize();
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/buffer/WALBufferCommonTest.java b/server/src/test/java/org/apache/iotdb/db/wal/buffer/WALBufferCommonTest.java
index a803a410b6..c6b88c4492 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/buffer/WALBufferCommonTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/buffer/WALBufferCommonTest.java
@@ -114,7 +114,7 @@ public abstract class WALBufferCommonTest {
       InsertRowPlan insertRowPlan = getInsertRowPlan(devicePath + memTableId, i);
       expectedInsertRowPlans.add(insertRowPlan);
 
-      WALEntry walEntry = new WALEntry(memTableId, insertRowPlan);
+      WALEntry walEntry = new WALInfoEntry(memTableId, insertRowPlan);
       walBuffer.write(walEntry);
     }
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java b/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
index 9b79403465..bf6460d830 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
 import org.apache.iotdb.db.wal.buffer.WALEntryType;
+import org.apache.iotdb.db.wal.buffer.WALInfoEntry;
 import org.apache.iotdb.db.wal.utils.WALByteBufferForTest;
 import org.apache.iotdb.db.wal.utils.WALFileStatus;
 import org.apache.iotdb.db.wal.utils.WALFileUtils;
@@ -75,11 +76,11 @@ public class WALFileTest {
   public void testReadNormalFile() throws IOException, IllegalPathException {
     int fakeMemTableId = 1;
     List<WALEntry> expectedWALEntries = new ArrayList<>();
-    expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertRowNode(devicePath)));
-    expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertTabletNode(devicePath)));
-    expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertRowPlan(devicePath)));
-    expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertTabletPlan(devicePath)));
-    expectedWALEntries.add(new WALEntry(fakeMemTableId, getDeletePlan(devicePath)));
+    expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getInsertRowNode(devicePath)));
+    expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getInsertTabletNode(devicePath)));
+    expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getInsertRowPlan(devicePath)));
+    expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getInsertTabletPlan(devicePath)));
+    expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getDeletePlan(devicePath)));
     // test WALEntry.serializedSize
     int size = 0;
     for (WALEntry walEntry : expectedWALEntries) {
@@ -122,11 +123,11 @@ public class WALFileTest {
   public void testReadBrokenFile() throws IOException, IllegalPathException {
     int fakeMemTableId = 1;
     List<WALEntry> expectedWALEntries = new ArrayList<>();
-    expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertRowNode(devicePath)));
-    expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertTabletNode(devicePath)));
-    expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertRowPlan(devicePath)));
-    expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertTabletPlan(devicePath)));
-    expectedWALEntries.add(new WALEntry(fakeMemTableId, getDeletePlan(devicePath)));
+    expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getInsertRowNode(devicePath)));
+    expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getInsertTabletNode(devicePath)));
+    expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getInsertRowPlan(devicePath)));
+    expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getInsertTabletPlan(devicePath)));
+    expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getDeletePlan(devicePath)));
     // test WALEntry.serializedSize
     int size = Byte.BYTES;
     for (WALEntry walEntry : expectedWALEntries) {
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
index 3350894304..65ab61780c 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
@@ -21,19 +21,18 @@ package org.apache.iotdb.db.wal.node;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.wal.buffer.WALEntry;
 import org.apache.iotdb.db.wal.utils.WALMode;
-import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
@@ -44,7 +43,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -127,170 +125,79 @@ public class ConsensusReqReaderTest {
     // _6-5-1.wal
     insertRowNode = getInsertRowNode(devicePath);
     insertRowNode.setSearchIndex(6);
-    WALFlushListener walFlushListener = walNode.log(0, insertRowNode); // 6
-    walFlushListener.waitForResult();
-  }
-
-  @Test
-  public void scenario01TestGetReq01() throws Exception {
-    simulateFileScenario01();
-
-    IConsensusRequest request;
-    request = walNode.getReq(1);
-    Assert.assertTrue(request instanceof InsertRowNode);
-    Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
-    request = walNode.getReq(2);
-    Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
-    Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
-    Assert.assertEquals(
-        3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
-    request = walNode.getReq(3);
-    Assert.assertTrue(request instanceof InsertRowsNode);
-    Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
-    Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
-    request = walNode.getReq(4);
-    Assert.assertTrue(request instanceof InsertMultiTabletsNode);
-    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
-    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
-    request = walNode.getReq(5);
-    Assert.assertTrue(request instanceof InsertTabletNode);
-    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
-    request = walNode.getReq(6);
-    Assert.assertNull(request);
-  }
-
-  @Test
-  public void scenario01TestGetReqs01() throws Exception {
-    simulateFileScenario01();
-    List<IConsensusRequest> requests;
-    IConsensusRequest request;
-
-    requests = walNode.getReqs(1, 6);
-    Assert.assertEquals(5, requests.size());
-    request = requests.get(0);
-    Assert.assertTrue(request instanceof InsertRowNode);
-    Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
-    request = requests.get(1);
-    Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
-    Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
-    Assert.assertEquals(
-        3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
-    request = requests.get(2);
-    Assert.assertTrue(request instanceof InsertRowsNode);
-    Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
-    Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
-    request = requests.get(3);
-    Assert.assertTrue(request instanceof InsertMultiTabletsNode);
-    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
-    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
-    request = requests.get(4);
-    Assert.assertTrue(request instanceof InsertTabletNode);
-    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
-  }
-
-  @Test
-  public void scenario01TestGetReqs02() throws Exception {
-    simulateFileScenario01();
-    List<IConsensusRequest> requests;
-    IConsensusRequest request;
-
-    requests = walNode.getReqs(3, 1);
-    Assert.assertEquals(1, requests.size());
-    request = requests.get(0);
-    Assert.assertTrue(request instanceof InsertRowsNode);
-    Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
-    Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
-  }
-
-  @Test
-  public void scenario01TestGetReqs03() throws Exception {
-    simulateFileScenario01();
-    List<IConsensusRequest> requests;
-    IConsensusRequest request;
-
-    requests = walNode.getReqs(4, 2);
-    Assert.assertEquals(2, requests.size());
-    request = requests.get(0);
-    Assert.assertTrue(request instanceof InsertMultiTabletsNode);
-    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
-    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
-    request = requests.get(1);
-    Assert.assertTrue(request instanceof InsertTabletNode);
-    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
-  }
-
-  @Test
-  public void scenario01TestGetReqs04() throws Exception {
-    simulateFileScenario01();
-    List<IConsensusRequest> requests;
-    IConsensusRequest request;
-
-    requests = walNode.getReqs(5, 100);
-    Assert.assertEquals(1, requests.size());
-    request = requests.get(0);
-    Assert.assertTrue(request instanceof InsertTabletNode);
-    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
-  }
-
-  @Test
-  public void scenario01TestGetReqs05() throws Exception {
-    simulateFileScenario01();
-    List<IConsensusRequest> requests;
-
-    requests = walNode.getReqs(6, 100);
-    Assert.assertEquals(0, requests.size());
+    walNode.log(0, insertRowNode); // 6
   }
 
   @Test
   public void scenario01TestGetReqIterator01() throws Exception {
     simulateFileScenario01();
-    IConsensusRequest request;
+    walNode.rollWALFile();
+
+    IndexedConsensusRequest request;
+    PlanNode planNode;
     ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
 
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertRowNode);
-    Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
+    request = iterator.next();
+    Assert.assertEquals(1, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof InsertRowNode);
+      Assert.assertEquals(1, ((InsertRowNode) planNode).getSearchIndex());
+    }
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
-    Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
-    Assert.assertEquals(
-        3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
+    request = iterator.next();
+    Assert.assertEquals(3, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof InsertRowNode);
+      Assert.assertEquals(2, ((InsertRowNode) planNode).getSearchIndex());
+    }
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertRowsNode);
-    Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
-    Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
+    request = iterator.next();
+    Assert.assertEquals(3, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof InsertRowNode);
+      Assert.assertEquals(3, ((InsertRowNode) planNode).getSearchIndex());
+    }
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertMultiTabletsNode);
-    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
-    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+    request = iterator.next();
+    Assert.assertEquals(4, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof InsertTabletNode);
+      Assert.assertEquals(4, ((InsertTabletNode) planNode).getSearchIndex());
+    }
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertTabletNode);
-    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+    request = iterator.next();
+    Assert.assertEquals(1, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof InsertTabletNode);
+      Assert.assertEquals(5, ((InsertTabletNode) planNode).getSearchIndex());
+    }
     Assert.assertFalse(iterator.hasNext());
   }
 
   @Test
   public void scenario01TestGetReqIterator02() throws Exception {
     simulateFileScenario01();
-    IConsensusRequest request;
+
+    IndexedConsensusRequest request;
+    PlanNode planNode;
     ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(4);
 
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertMultiTabletsNode);
-    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
-    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
-    Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertTabletNode);
-    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
-
+    request = iterator.next();
+    Assert.assertEquals(4, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof InsertTabletNode);
+      Assert.assertEquals(4, ((InsertTabletNode) planNode).getSearchIndex());
+    }
     Assert.assertFalse(iterator.hasNext());
+
     // wait for next
     ExecutorService checkThread = Executors.newSingleThreadExecutor();
     Future<Boolean> future =
@@ -298,14 +205,19 @@ public class ConsensusReqReaderTest {
             () -> {
               iterator.waitForNextReady();
               Assert.assertTrue(iterator.hasNext());
-              IConsensusRequest req = iterator.next().getRequest();
-              ;
-              Assert.assertTrue(req instanceof InsertRowNode);
-              Assert.assertEquals(6, ((InsertRowNode) req).getSearchIndex());
+              IndexedConsensusRequest req = iterator.next();
+              Assert.assertEquals(1, req.getRequests().size());
+              for (IConsensusRequest innerRequest : req.getRequests()) {
+                PlanNode node =
+                    WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+                Assert.assertTrue(node instanceof InsertTabletNode);
+                Assert.assertEquals(5, ((InsertTabletNode) node).getSearchIndex());
+              }
               return true;
             });
 
     Thread.sleep(500);
+    walNode.rollWALFile();
     InsertRowNode insertRowNode = getInsertRowNode(devicePath);
     walNode.log(0, insertRowNode); // put -1 after 6
     Assert.assertTrue(future.get());
@@ -314,26 +226,35 @@ public class ConsensusReqReaderTest {
   @Test
   public void scenario01TestGetReqIterator03() throws Exception {
     simulateFileScenario01();
-    IConsensusRequest request;
     ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
 
-    Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertTabletNode);
-    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
-
     Assert.assertFalse(iterator.hasNext());
+
     // wait for next
     ExecutorService checkThread = Executors.newSingleThreadExecutor();
     Future<Boolean> future =
         checkThread.submit(
             () -> {
+              iterator.waitForNextReady();
+              IndexedConsensusRequest request;
+              PlanNode planNode;
+              Assert.assertTrue(iterator.hasNext());
+              request = iterator.next();
+              Assert.assertEquals(1, request.getRequests().size());
+              for (IConsensusRequest innerRequest : request.getRequests()) {
+                planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+                Assert.assertTrue(planNode instanceof InsertTabletNode);
+                Assert.assertEquals(5, ((InsertTabletNode) planNode).getSearchIndex());
+              }
               iterator.waitForNextReady();
               Assert.assertTrue(iterator.hasNext());
-              IConsensusRequest req = iterator.next().getRequest();
-              ;
-              Assert.assertTrue(req instanceof InsertRowNode);
-              Assert.assertEquals(6, ((InsertRowNode) req).getSearchIndex());
+              request = iterator.next();
+              Assert.assertEquals(1, request.getRequests().size());
+              for (IConsensusRequest innerRequest : request.getRequests()) {
+                planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+                Assert.assertTrue(planNode instanceof InsertRowNode);
+                Assert.assertEquals(6, ((InsertRowNode) planNode).getSearchIndex());
+              }
               return true;
             });
 
@@ -341,73 +262,125 @@ public class ConsensusReqReaderTest {
     walNode.rollWALFile();
     InsertRowNode insertRowNode = getInsertRowNode(devicePath);
     walNode.log(0, insertRowNode); // put -1 after 6
+    walNode.rollWALFile();
+    insertRowNode = getInsertRowNode(devicePath);
+    walNode.log(0, insertRowNode); // put -1 after 6
     Assert.assertTrue(future.get());
   }
 
   @Test
   public void scenario01TestGetReqIterator04() throws Exception {
     simulateFileScenario01();
-    IConsensusRequest request;
+    walNode.rollWALFile();
+
+    IndexedConsensusRequest request;
+    PlanNode planNode;
     ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
 
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertRowNode);
-    Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
+    request = iterator.next();
+    Assert.assertEquals(1, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof InsertRowNode);
+      Assert.assertEquals(1, ((InsertRowNode) planNode).getSearchIndex());
+    }
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
-    Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
-    Assert.assertEquals(
-        3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
+    request = iterator.next();
+    Assert.assertEquals(3, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof InsertRowNode);
+      Assert.assertEquals(2, ((InsertRowNode) planNode).getSearchIndex());
+    }
 
     iterator.skipTo(4);
 
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertMultiTabletsNode);
-    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
-    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+    request = iterator.next();
+    Assert.assertEquals(4, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof InsertTabletNode);
+      Assert.assertEquals(4, ((InsertTabletNode) planNode).getSearchIndex());
+    }
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertTabletNode);
-    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+    request = iterator.next();
+    Assert.assertEquals(1, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof InsertTabletNode);
+      Assert.assertEquals(5, ((InsertTabletNode) planNode).getSearchIndex());
+    }
     Assert.assertFalse(iterator.hasNext());
   }
 
   @Test
   public void scenario01TestGetReqIterator05() throws Exception {
     simulateFileScenario01();
-    IConsensusRequest request;
+    walNode.rollWALFile();
+
+    IndexedConsensusRequest request;
+    PlanNode planNode;
     ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
 
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertTabletNode);
-    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+    request = iterator.next();
+    Assert.assertEquals(1, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof InsertTabletNode);
+      Assert.assertEquals(5, ((InsertTabletNode) planNode).getSearchIndex());
+    }
 
     iterator.skipTo(2);
 
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
-    Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
-    Assert.assertEquals(
-        3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
+    request = iterator.next();
+    Assert.assertEquals(3, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof InsertRowNode);
+      Assert.assertEquals(2, ((InsertRowNode) planNode).getSearchIndex());
+    }
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertRowsNode);
-    Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
-    Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
+    request = iterator.next();
+    Assert.assertEquals(3, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof InsertRowNode);
+      Assert.assertEquals(3, ((InsertRowNode) planNode).getSearchIndex());
+    }
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertMultiTabletsNode);
-    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
-    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+    request = iterator.next();
+    Assert.assertEquals(4, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof InsertTabletNode);
+      Assert.assertEquals(4, ((InsertTabletNode) planNode).getSearchIndex());
+    }
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next().getRequest();
-    Assert.assertTrue(request instanceof InsertTabletNode);
-    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+    request = iterator.next();
+    Assert.assertEquals(1, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof InsertTabletNode);
+      Assert.assertEquals(5, ((InsertTabletNode) planNode).getSearchIndex());
+    }
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void scenario01TestGetReqIterator06() throws Exception {
+    simulateFileScenario01();
+    ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void scenario01TestGetReqIterator07() throws Exception {
+    simulateFileScenario01();
+    ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(6);
     Assert.assertFalse(iterator.hasNext());
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/recover/WALRecoverManagerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/recover/WALRecoverManagerTest.java
index 7757d171df..049a8a3c5f 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/recover/WALRecoverManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/recover/WALRecoverManagerTest.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.wal.buffer.IWALBuffer;
 import org.apache.iotdb.db.wal.buffer.WALBuffer;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALInfoEntry;
 import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
 import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
 import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
@@ -178,7 +179,7 @@ public class WALRecoverManagerTest {
             try {
               while (walBuffer.getCurrentWALFileVersion() - firstWALVersionId < 2) {
                 WALEntry walEntry =
-                    new WALEntry(
+                    new WALInfoEntry(
                         memTableId, getInsertTabletPlan(SG_NAME.concat("test_d" + memTableId)));
                 walBuffer.write(walEntry);
               }
@@ -204,7 +205,7 @@ public class WALRecoverManagerTest {
     long firstValidVersionId = walBuffer.getCurrentWALFileVersion();
     IMemTable targetMemTable = new PrimitiveMemTable();
     WALEntry walEntry =
-        new WALEntry(targetMemTable.getMemTableId(), getInsertRowPlan(DEVICE2_NAME, 4L), true);
+        new WALInfoEntry(targetMemTable.getMemTableId(), getInsertRowPlan(DEVICE2_NAME, 4L), true);
     walBuffer.write(walEntry);
     walEntry.getWalFlushListener().waitForResult();
     // write .checkpoint file
@@ -236,7 +237,7 @@ public class WALRecoverManagerTest {
             try {
               while (walBuffer.getCurrentWALFileVersion() - firstWALVersionId < 2) {
                 WALEntry walEntry =
-                    new WALEntry(
+                    new WALInfoEntry(
                         memTableId, getInsertTabletPlan(SG_NAME.concat("test_d" + memTableId)));
                 walBuffer.write(walEntry);
               }
@@ -264,11 +265,11 @@ public class WALRecoverManagerTest {
     InsertRowPlan insertRowPlan = getInsertRowPlan(DEVICE2_NAME, 4L);
     targetMemTable.insert(insertRowPlan);
 
-    WALEntry walEntry = new WALEntry(targetMemTable.getMemTableId(), insertRowPlan, true);
+    WALEntry walEntry = new WALInfoEntry(targetMemTable.getMemTableId(), insertRowPlan, true);
     walBuffer.write(walEntry);
     walEntry.getWalFlushListener().waitForResult();
 
-    walEntry = new WALEntry(targetMemTable.getMemTableId(), targetMemTable, true);
+    walEntry = new WALInfoEntry(targetMemTable.getMemTableId(), targetMemTable, true);
     walBuffer.write(walEntry);
     walEntry.getWalFlushListener().waitForResult();
     // write .checkpoint file
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
index f5ff449761..d6ebf64d70 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALInfoEntry;
 import org.apache.iotdb.db.wal.utils.TsFileUtilsForRecoverTest;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
@@ -126,7 +127,7 @@ public class UnsealedTsFileRecoverPerformerTest {
         new InsertRowPlan(
             new PartialPath(DEVICE2_NAME), time, new String[] {"s1", "s2"}, dataTypes, columns);
     int fakeMemTableId = 1;
-    WALEntry walEntry = new WALEntry(fakeMemTableId, insertRowPlan);
+    WALEntry walEntry = new WALInfoEntry(fakeMemTableId, insertRowPlan);
     // recover
     tsFileResource = new TsFileResource(file);
     // vsg processor is used to test IdTable, don't test IdTable here
@@ -181,7 +182,7 @@ public class UnsealedTsFileRecoverPerformerTest {
     DeletePlan deletePlan =
         new DeletePlan(Long.MIN_VALUE, Long.MAX_VALUE, new PartialPath(DEVICE2_NAME));
     int fakeMemTableId = 1;
-    WALEntry walEntry = new WALEntry(fakeMemTableId, deletePlan);
+    WALEntry walEntry = new WALInfoEntry(fakeMemTableId, deletePlan);
     // recover
     tsFileResource = new TsFileResource(file);
     // vsg processor is used to test IdTable, don't test IdTable here
@@ -306,8 +307,8 @@ public class UnsealedTsFileRecoverPerformerTest {
     insertTabletNode.markFailedMeasurement(0, new Exception());
 
     int fakeMemTableId = 1;
-    WALEntry walEntry1 = new WALEntry(fakeMemTableId++, insertRowNode);
-    WALEntry walEntry2 = new WALEntry(fakeMemTableId, insertTabletNode);
+    WALEntry walEntry1 = new WALInfoEntry(fakeMemTableId++, insertRowNode);
+    WALEntry walEntry2 = new WALInfoEntry(fakeMemTableId, insertTabletNode);
     // recover
     tsFileResource = new TsFileResource(file);
     // vsg processor is used to test IdTable, don't test IdTable here
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/utils/WALByteBufferForTest.java b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALByteBufferForTest.java
index a46f783e73..765ed1a03e 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/utils/WALByteBufferForTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALByteBufferForTest.java
@@ -69,6 +69,11 @@ public class WALByteBufferForTest implements IWALByteBufferView {
     buffer.putDouble(value);
   }
 
+  @Override
+  public int position() {
+    return buffer.position();
+  }
+
   public ByteBuffer getBuffer() {
     return buffer;
   }
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index 5e2836c5a6..d36601be1c 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -22,6 +22,7 @@ namespace java org.apache.iotdb.consensus.multileader.thrift
 
 struct TLogBatch {
   1: required binary data
+  2: required bool fromWAL
 }
 
 struct TSyncLogReq {