You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/07/01 02:37:36 UTC

[iotdb] branch master updated: Fix the issues in MultiLeader log dispatcher (#6484)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new deb9732b43 Fix the issues in MultiLeader log dispatcher (#6484)
deb9732b43 is described below

commit deb9732b43c6f6dbb4e803bdc66a89c0f2911d1c
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Fri Jul 1 10:37:31 2022 +0800

    Fix the issues in MultiLeader log dispatcher (#6484)
---
 .../multileader/MultiLeaderServerImpl.java         |  42 ++++-
 .../multileader/logdispatcher/IndexController.java |  67 ++++---
 .../multileader/logdispatcher/LogDispatcher.java   |  80 ++++++--
 .../multileader/wal/ConsensusReqReader.java        |   6 +-
 .../multileader/MultiLeaderConsensusTest.java      | 209 ++-------------------
 .../iotdb/consensus/multileader/RecoveryTest.java  |   4 +-
 .../logdispatcher/IndexControllerTest.java         |  38 +---
 .../multileader/logdispatcher/SyncStatusTest.java  |   8 +-
 .../multileader/util/FakeConsensusReqReader.java   | 111 +++++++++++
 .../consensus/multileader/util/RequestSets.java    |  55 ++++++
 .../consensus/multileader/util/TestEntry.java      |  74 ++++++++
 .../multileader/util/TestStateMachine.java         |  93 +++++++++
 .../java/org/apache/iotdb/db/tools/WalChecker.java |  15 +-
 .../iotdb/db/wal/buffer/AbstractWALBuffer.java     |   5 +
 .../org/apache/iotdb/db/wal/buffer/IWALBuffer.java |   3 +
 .../org/apache/iotdb/db/wal/node/WALFakeNode.java  |   5 +
 .../java/org/apache/iotdb/db/wal/node/WALNode.java |  66 +++----
 .../iotdb/db/wal/node/ConsensusReqReaderTest.java  |  40 ++--
 18 files changed, 574 insertions(+), 347 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 272c244a1e..93f9cf567b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -30,10 +30,10 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
-import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
 import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
-import org.apache.iotdb.consensus.ratis.Utils;
+import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.slf4j.Logger;
@@ -46,6 +46,7 @@ import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class MultiLeaderServerImpl {
 
@@ -57,7 +58,7 @@ public class MultiLeaderServerImpl {
   private final IStateMachine stateMachine;
   private final String storageDir;
   private final List<Peer> configuration;
-  private final IndexController controller;
+  private final AtomicLong index;
   private final LogDispatcher logDispatcher;
   private final MultiLeaderConfig config;
 
@@ -71,8 +72,6 @@ public class MultiLeaderServerImpl {
     this.storageDir = storageDir;
     this.thisNode = thisNode;
     this.stateMachine = stateMachine;
-    this.controller =
-        new IndexController(storageDir, Utils.fromTEndPointToString(thisNode.getEndpoint()), true);
     this.configuration = configuration;
     if (configuration.isEmpty()) {
       recoverConfiguration();
@@ -81,6 +80,11 @@ public class MultiLeaderServerImpl {
     }
     this.config = config;
     this.logDispatcher = new LogDispatcher(this, clientManager);
+    // restart
+    ConsensusReqReader reader =
+        (ConsensusReqReader) stateMachine.read(new GetConsensusReqReaderPlan());
+    long currentSearchIndex = reader.getCurrentSearchIndex();
+    this.index = new AtomicLong(currentSearchIndex);
   }
 
   public IStateMachine getStateMachine() {
@@ -104,8 +108,26 @@ public class MultiLeaderServerImpl {
     synchronized (stateMachine) {
       IndexedConsensusRequest indexedConsensusRequest =
           buildIndexedConsensusRequestForLocalRequest(request);
+      if (indexedConsensusRequest.getSearchIndex() % 1000 == 0) {
+        logger.info(
+            "DataRegion[{}]: index after build: safeIndex: {}, searchIndex: {}",
+            thisNode.getGroupId(),
+            indexedConsensusRequest.getSafelyDeletedSearchIndex(),
+            indexedConsensusRequest.getSearchIndex());
+      }
+      // TODO wal and memtable
       TSStatus result = stateMachine.write(indexedConsensusRequest);
-      logDispatcher.offer(indexedConsensusRequest);
+      if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        logDispatcher.offer(indexedConsensusRequest);
+      } else {
+        logger.debug(
+            "{}: write operation failed. searchIndex: {}. Code: {}",
+            thisNode.getGroupId(),
+            indexedConsensusRequest.getSearchIndex(),
+            result.getCode());
+        index.decrementAndGet();
+      }
+
       return result;
     }
   }
@@ -156,7 +178,7 @@ public class MultiLeaderServerImpl {
   public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
       IConsensusRequest request) {
     return new IndexedConsensusRequest(
-        controller.incrementAndGet(), getCurrentSafelyDeletedSearchIndex(), request);
+        index.incrementAndGet(), getCurrentSafelyDeletedSearchIndex(), request);
   }
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
@@ -170,7 +192,7 @@ public class MultiLeaderServerImpl {
    * single copies, the current index is selected
    */
   public long getCurrentSafelyDeletedSearchIndex() {
-    return logDispatcher.getMinSyncIndex().orElseGet(controller::getCurrentIndex);
+    return logDispatcher.getMinSyncIndex().orElseGet(index::get);
   }
 
   public String getStorageDir() {
@@ -185,8 +207,8 @@ public class MultiLeaderServerImpl {
     return configuration;
   }
 
-  public IndexController getController() {
-    return controller;
+  public long getIndex() {
+    return index.get();
   }
 
   public MultiLeaderConfig getConfig() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
index 6a7396076d..064f6841e0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
@@ -30,6 +30,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /** An index controller class to balance the performance degradation of frequent disk I/O. */
 @ThreadSafe
@@ -39,42 +40,56 @@ public class IndexController {
 
   public static final int FLUSH_INTERVAL = 500;
 
-  private volatile long lastFlushedIndex;
-  private volatile long currentIndex;
+  private long lastFlushedIndex;
+  private long currentIndex;
+
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
   private final String storageDir;
   private final String prefix;
-  // Indicates whether currentIndex needs to be incremented by FLUSH_INTERVAL interval after restart
-  private final boolean incrementIntervalAfterRestart;
 
-  public IndexController(String storageDir, String prefix, boolean incrementIntervalAfterRestart) {
+  public IndexController(String storageDir, String prefix) {
     this.storageDir = storageDir;
     this.prefix = prefix + '-';
-    this.incrementIntervalAfterRestart = incrementIntervalAfterRestart;
     restore();
   }
 
-  public synchronized long incrementAndGet() {
-    currentIndex++;
-    checkPersist();
-    return currentIndex;
+  public long incrementAndGet() {
+    try {
+      lock.writeLock().lock();
+      currentIndex++;
+      checkPersist();
+      return currentIndex;
+    } finally {
+      lock.writeLock().unlock();
+    }
   }
 
-  public synchronized long updateAndGet(long index) {
-    long newCurrentIndex = Math.max(currentIndex, index);
-    logger.debug(
-        "update index from currentIndex {} to {} for file prefix {} in {}",
-        currentIndex,
-        newCurrentIndex,
-        prefix,
-        storageDir);
-    currentIndex = newCurrentIndex;
-    checkPersist();
-    return currentIndex;
+  public long updateAndGet(long index) {
+    try {
+      lock.writeLock().lock();
+      long newCurrentIndex = Math.max(currentIndex, index);
+      logger.debug(
+          "update index from currentIndex {} to {} for file prefix {} in {}",
+          currentIndex,
+          newCurrentIndex,
+          prefix,
+          storageDir);
+      currentIndex = newCurrentIndex;
+      checkPersist();
+      return currentIndex;
+    } finally {
+      lock.writeLock().unlock();
+    }
   }
 
   public long getCurrentIndex() {
-    return currentIndex;
+    try {
+      lock.readLock().lock();
+      return currentIndex;
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   @TestOnly
@@ -131,13 +146,7 @@ public class IndexController {
           }
         }
       }
-      if (incrementIntervalAfterRestart) {
-        // prevent overlapping in case of failure
-        currentIndex = lastFlushedIndex + FLUSH_INTERVAL;
-        persist();
-      } else {
-        currentIndex = lastFlushedIndex;
-      }
+      currentIndex = lastFlushedIndex;
     } else {
       versionFile = new File(directory, prefix + "0");
       try {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 1d651a0ba5..ae60598061 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
@@ -122,7 +121,7 @@ public class LogDispatcher {
   }
 
   public class LogDispatcherThread implements Runnable {
-
+    private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10;
     private final MultiLeaderConfig config;
     private final Peer peer;
     private final IndexController controller;
@@ -137,6 +136,9 @@ public class LogDispatcher {
         (ConsensusReqReader) impl.getStateMachine().read(new GetConsensusReqReaderPlan());
     private volatile boolean stopped = false;
 
+    private ConsensusReqReader.ReqIterator walEntryiterator;
+    private long iteratorIndex = 1;
+
     public LogDispatcherThread(Peer peer, MultiLeaderConfig config) {
       this.peer = peer;
       this.config = config;
@@ -144,8 +146,9 @@ public class LogDispatcher {
           new ArrayBlockingQueue<>(config.getReplication().getMaxPendingRequestNumPerNode());
       this.controller =
           new IndexController(
-              impl.getStorageDir(), Utils.fromTEndPointToString(peer.getEndpoint()), false);
+              impl.getStorageDir(), Utils.fromTEndPointToString(peer.getEndpoint()));
       this.syncStatus = new SyncStatus(controller, config);
+      this.walEntryiterator = reader.getReqIterator(iteratorIndex);
     }
 
     public IndexController getController() {
@@ -184,10 +187,14 @@ public class LogDispatcher {
         while (!Thread.interrupted() && !stopped) {
           while ((batch = getBatch()).isEmpty()) {
             // we may block here if there is no requests in the queue
-            bufferedRequest.add(pendingRequest.take());
-            // If write pressure is low, we simply sleep a little to reduce the number of RPC
-            if (pendingRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
-              Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
+            IndexedConsensusRequest request =
+                pendingRequest.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC, TimeUnit.SECONDS);
+            if (request != null) {
+              bufferedRequest.add(request);
+              // If write pressure is low, we simply sleep a little to reduce the number of RPC
+              if (pendingRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
+                Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
+              }
             }
           }
           // we may block here if the synchronization pipeline is full
@@ -207,20 +214,36 @@ public class LogDispatcher {
       PendingBatch batch;
       List<TLogBatch> logBatches = new ArrayList<>();
       long startIndex = syncStatus.getNextSendingIndex();
-      long maxIndex = impl.getController().getCurrentIndex() + 1;
+      logger.debug("get batch. startIndex: {}", startIndex);
       long endIndex;
       if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
         // Use drainTo instead of poll to reduce lock overhead
+        logger.debug(
+            "{} : pendingRequest Size: {}, bufferedRequest size: {}",
+            impl.getThisNode().getGroupId(),
+            pendingRequest.size(),
+            bufferedRequest.size());
         pendingRequest.drainTo(
             bufferedRequest,
             config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
+        // remove all request that searchIndex < startIndex
+        Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
+        while (iterator.hasNext()) {
+          IndexedConsensusRequest request = iterator.next();
+          if (request.getSearchIndex() < startIndex) {
+            iterator.remove();
+          } else {
+            break;
+          }
+        }
       }
-      if (bufferedRequest.isEmpty()) {
-        // only execute this after a restart
-        endIndex = constructBatchFromWAL(startIndex, maxIndex, logBatches);
+      if (bufferedRequest.isEmpty()) { // only execute this after a restart
+        endIndex = constructBatchFromWAL(startIndex, impl.getIndex() + 1, logBatches);
         batch = new PendingBatch(startIndex, endIndex, logBatches);
-        logger.debug("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batch);
+        logger.debug(
+            "{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batch);
       } else {
+        // Notice that prev searchIndex >= startIndex
         Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
         IndexedConsensusRequest prev = iterator.next();
         // Prevents gap between logs. For example, some requests are not written into the queue when
@@ -245,7 +268,7 @@ public class LogDispatcher {
             if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
               batch = new PendingBatch(startIndex, endIndex, logBatches);
               logger.debug(
-                  "{} : accumulated a {} from queue and wal",
+                  "gap {} : accumulated a {} from queue and wal when gap",
                   impl.getThisNode().getGroupId(),
                   batch);
               return batch;
@@ -283,15 +306,36 @@ public class LogDispatcher {
 
     private long constructBatchFromWAL(
         long currentIndex, long maxIndex, List<TLogBatch> logBatches) {
+      logger.debug(
+          String.format(
+              "DataRegion[%s]->%s: currentIndex: %d, maxIndex: %d, iteratorIndex: %d",
+              peer.getGroupId().getId(),
+              peer.getEndpoint().ip,
+              currentIndex,
+              maxIndex,
+              iteratorIndex));
+      if (iteratorIndex != currentIndex) {
+        walEntryiterator.skipTo(currentIndex);
+        iteratorIndex = currentIndex;
+      }
       while (currentIndex < maxIndex
           && logBatches.size() < config.getReplication().getMaxRequestPerBatch()) {
-        // TODO iterator
-        IConsensusRequest data = reader.getReq(currentIndex++);
-        if (data != null) {
-          logBatches.add(new TLogBatch(data.serializeToByteBuffer()));
+        logger.debug("construct from WAL for one Entry, index : {}", currentIndex);
+        try {
+          walEntryiterator.waitForNextReady();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          logger.warn("wait for next WAL entry is interrupted");
+        }
+        IndexedConsensusRequest data = walEntryiterator.next();
+        currentIndex = data.getSearchIndex();
+        iteratorIndex = currentIndex;
+        logBatches.add(new TLogBatch(data.serializeToByteBuffer()));
+        if (currentIndex == maxIndex - 1) {
+          break;
         }
       }
-      return currentIndex - 1;
+      return currentIndex;
     }
 
     private void constructBatchIndexedFromConsensusRequest(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
index 5880e5de2d..a39e3186a4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.consensus.multileader.wal;
 
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 
 import java.util.Iterator;
 import java.util.List;
@@ -68,7 +69,7 @@ public interface ConsensusReqReader {
      * @throws java.util.NoSuchElementException if the iteration has no more elements, wait a moment
      *     or call {@link this#waitForNextReady} for more elements
      */
-    IConsensusRequest next();
+    IndexedConsensusRequest next();
 
     /**
      * Wait for the next element in the iteration ready, blocked until next element is available.
@@ -89,4 +90,7 @@ public interface ConsensusReqReader {
      */
     void skipTo(long targetIndex);
   }
+
+  /** Get current search index */
+  long getCurrentSearchIndex();
 }
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
index 320d4d3d27..771b600c69 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
@@ -20,22 +20,15 @@
 package org.apache.iotdb.consensus.multileader;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
-import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.ConsensusGroup;
-import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
-import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
-import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.consensus.multileader.util.TestEntry;
+import org.apache.iotdb.consensus.multileader.util.TestStateMachine;
 
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
@@ -45,17 +38,11 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 public class MultiLeaderConsensusTest {
 
@@ -133,29 +120,19 @@ public class MultiLeaderConsensusTest {
     servers.get(1).addConsensusGroup(group.getGroupId(), group.getPeers());
     servers.get(2).addConsensusGroup(group.getGroupId(), group.getPeers());
 
-    Assert.assertEquals(0, servers.get(0).getImpl(gid).getController().getCurrentIndex());
-    Assert.assertEquals(0, servers.get(1).getImpl(gid).getController().getCurrentIndex());
-    Assert.assertEquals(0, servers.get(2).getImpl(gid).getController().getCurrentIndex());
+    Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
+    Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
+    Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
 
     for (int i = 0; i < IndexController.FLUSH_INTERVAL; i++) {
       servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
       servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
       servers.get(2).write(gid, new TestEntry(i, peers.get(2)));
-      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getController().getCurrentIndex());
-      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getController().getCurrentIndex());
-      Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getController().getCurrentIndex());
+      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex());
+      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex());
+      Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getIndex());
     }
 
-    Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL,
-        servers.get(0).getImpl(gid).getController().getLastFlushedIndex());
-    Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL,
-        servers.get(1).getImpl(gid).getController().getLastFlushedIndex());
-    Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL,
-        servers.get(2).getImpl(gid).getController().getLastFlushedIndex());
-
     for (int i = 0; i < 3; i++) {
       long start = System.currentTimeMillis();
       while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex()
@@ -193,15 +170,9 @@ public class MultiLeaderConsensusTest {
     Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
     Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
 
-    Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL * 2,
-        servers.get(0).getImpl(gid).getController().getCurrentIndex());
-    Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL * 2,
-        servers.get(1).getImpl(gid).getController().getCurrentIndex());
-    Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL * 2,
-        servers.get(2).getImpl(gid).getController().getCurrentIndex());
+    Assert.assertEquals(IndexController.FLUSH_INTERVAL, servers.get(0).getImpl(gid).getIndex());
+    Assert.assertEquals(IndexController.FLUSH_INTERVAL, servers.get(1).getImpl(gid).getIndex());
+    Assert.assertEquals(IndexController.FLUSH_INTERVAL, servers.get(2).getImpl(gid).getIndex());
 
     for (int i = 0; i < 3; i++) {
       long start = System.currentTimeMillis();
@@ -217,7 +188,7 @@ public class MultiLeaderConsensusTest {
 
     Assert.assertEquals(
         IndexController.FLUSH_INTERVAL,
-        servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+        servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
     Assert.assertEquals(
         IndexController.FLUSH_INTERVAL,
         servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
@@ -237,23 +208,16 @@ public class MultiLeaderConsensusTest {
     servers.get(0).addConsensusGroup(group.getGroupId(), group.getPeers());
     servers.get(1).addConsensusGroup(group.getGroupId(), group.getPeers());
 
-    Assert.assertEquals(0, servers.get(0).getImpl(gid).getController().getCurrentIndex());
-    Assert.assertEquals(0, servers.get(1).getImpl(gid).getController().getCurrentIndex());
+    Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
+    Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
 
     for (int i = 0; i < IndexController.FLUSH_INTERVAL; i++) {
       servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
       servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
-      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getController().getCurrentIndex());
-      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getController().getCurrentIndex());
+      Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex());
+      Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex());
     }
 
-    Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL,
-        servers.get(0).getImpl(gid).getController().getLastFlushedIndex());
-    Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL,
-        servers.get(1).getImpl(gid).getController().getLastFlushedIndex());
-
     Assert.assertEquals(0, servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
     Assert.assertEquals(0, servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
 
@@ -266,13 +230,9 @@ public class MultiLeaderConsensusTest {
     Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
     Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
 
-    Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL * 2,
-        servers.get(0).getImpl(gid).getController().getCurrentIndex());
-    Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL * 2,
-        servers.get(1).getImpl(gid).getController().getCurrentIndex());
-    Assert.assertEquals(0, servers.get(2).getImpl(gid).getController().getCurrentIndex());
+    Assert.assertEquals(IndexController.FLUSH_INTERVAL, servers.get(0).getImpl(gid).getIndex());
+    Assert.assertEquals(IndexController.FLUSH_INTERVAL, servers.get(1).getImpl(gid).getIndex());
+    Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
 
     for (int i = 0; i < 2; i++) {
       long start = System.currentTimeMillis();
@@ -299,135 +259,4 @@ public class MultiLeaderConsensusTest {
     Assert.assertEquals(stateMachines.get(0).getData(), stateMachines.get(1).getData());
     Assert.assertEquals(stateMachines.get(2).getData(), stateMachines.get(1).getData());
   }
-
-  private static class TestEntry implements IConsensusRequest {
-
-    private final int num;
-    private final Peer peer;
-
-    public TestEntry(int num, Peer peer) {
-      this.num = num;
-      this.peer = peer;
-    }
-
-    @Override
-    public ByteBuffer serializeToByteBuffer() {
-      try (PublicBAOS publicBAOS = new PublicBAOS();
-          DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
-        outputStream.writeInt(num);
-        peer.serialize(outputStream);
-        return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      TestEntry testEntry = (TestEntry) o;
-      return num == testEntry.num && Objects.equals(peer, testEntry.peer);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(num, peer);
-    }
-
-    @Override
-    public String toString() {
-      return "TestEntry{" + "num=" + num + ", peer=" + peer + '}';
-    }
-  }
-
-  private static class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
-
-    private final Set<IndexedConsensusRequest> requestSet = ConcurrentHashMap.newKeySet();
-
-    public Set<IndexedConsensusRequest> getRequestSet() {
-      return requestSet;
-    }
-
-    public Set<TestEntry> getData() {
-      Set<TestEntry> data = new HashSet<>();
-      requestSet.forEach(x -> data.add((TestEntry) x.getRequest()));
-      return data;
-    }
-
-    @Override
-    public void start() {}
-
-    @Override
-    public void stop() {}
-
-    @Override
-    public TSStatus write(IConsensusRequest request) {
-      synchronized (requestSet) {
-        IConsensusRequest innerRequest = ((IndexedConsensusRequest) request).getRequest();
-        if (innerRequest instanceof ByteBufferConsensusRequest) {
-          ByteBuffer buffer = innerRequest.serializeToByteBuffer();
-          requestSet.add(
-              new IndexedConsensusRequest(
-                  ((IndexedConsensusRequest) request).getSearchIndex(),
-                  -1,
-                  new TestEntry(buffer.getInt(), Peer.deserialize(buffer))));
-        } else {
-          requestSet.add(((IndexedConsensusRequest) request));
-        }
-        return new TSStatus();
-      }
-    }
-
-    @Override
-    public synchronized DataSet read(IConsensusRequest request) {
-      if (request instanceof GetConsensusReqReaderPlan) {
-        return new FakeConsensusReqReader(requestSet);
-      }
-      return null;
-    }
-
-    @Override
-    public boolean takeSnapshot(File snapshotDir) {
-      return false;
-    }
-
-    @Override
-    public void loadSnapshot(File latestSnapshotRootDir) {}
-  }
-
-  public static class FakeConsensusReqReader implements ConsensusReqReader, DataSet {
-
-    private final Set<IndexedConsensusRequest> requestSet;
-
-    public FakeConsensusReqReader(Set<IndexedConsensusRequest> requestSet) {
-      this.requestSet = requestSet;
-    }
-
-    @Override
-    public IConsensusRequest getReq(long index) {
-      synchronized (requestSet) {
-        for (IndexedConsensusRequest indexedConsensusRequest : requestSet) {
-          if (indexedConsensusRequest.getSearchIndex() == index) {
-            return indexedConsensusRequest;
-          }
-        }
-        return null;
-      }
-    }
-
-    @Override
-    public List<IConsensusRequest> getReqs(long startIndex, int num) {
-      return null;
-    }
-
-    @Override
-    public ReqIterator getReqIterator(long startIndex) {
-      return null;
-    }
-  }
 }
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java
index b9000b7337..fb28a78058 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java
@@ -23,11 +23,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.ConsensusFactory;
-import org.apache.iotdb.consensus.EmptyStateMachine;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.multileader.util.TestStateMachine;
 
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
@@ -52,7 +52,7 @@ public class RecoveryTest {
                     .setThisNode(new TEndPoint("0.0.0.0", 9000))
                     .setStorageDir("target" + java.io.File.separator + "recovery")
                     .build(),
-                gid -> new EmptyStateMachine())
+                gid -> new TestStateMachine())
             .orElseThrow(
                 () ->
                     new IllegalArgumentException(
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
index c9bf6051b0..aa6f63783f 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
@@ -43,38 +43,10 @@ public class IndexControllerTest {
     FileUtils.deleteFully(storageDir);
   }
 
-  /** test indexController when incrementIntervalAfterRestart == true */
-  @Test
-  public void testTrueIncrementIntervalAfterRestart() {
-    IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix, true);
-    Assert.assertEquals(0, controller.getCurrentIndex());
-    Assert.assertEquals(0, controller.getLastFlushedIndex());
-
-    for (int i = 0; i < IndexController.FLUSH_INTERVAL - 1; i++) {
-      controller.incrementAndGet();
-    }
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL - 1, controller.getCurrentIndex());
-    Assert.assertEquals(0, controller.getLastFlushedIndex());
-
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, true);
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL, controller.getCurrentIndex());
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL, controller.getLastFlushedIndex());
-
-    for (int i = 0; i < IndexController.FLUSH_INTERVAL + 1; i++) {
-      controller.incrementAndGet();
-    }
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL * 2 + 1, controller.getCurrentIndex());
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL * 2, controller.getLastFlushedIndex());
-
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, true);
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL * 3, controller.getCurrentIndex());
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL * 3, controller.getLastFlushedIndex());
-  }
-
   /** test indexController when incrementIntervalAfterRestart == false */
   @Test
-  public void testFalseIncrementIntervalAfterRestart() {
-    IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix, false);
+  public void testIncrementIntervalAfterRestart() {
+    IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -83,7 +55,7 @@ public class IndexControllerTest {
     Assert.assertEquals(IndexController.FLUSH_INTERVAL - 1, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, false);
+    controller = new IndexController(storageDir.getAbsolutePath(), prefix);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -91,7 +63,7 @@ public class IndexControllerTest {
     Assert.assertEquals(IndexController.FLUSH_INTERVAL + 1, controller.getCurrentIndex());
     Assert.assertEquals(IndexController.FLUSH_INTERVAL, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, false);
+    controller = new IndexController(storageDir.getAbsolutePath(), prefix);
     Assert.assertEquals(IndexController.FLUSH_INTERVAL, controller.getCurrentIndex());
     Assert.assertEquals(IndexController.FLUSH_INTERVAL, controller.getLastFlushedIndex());
 
@@ -99,7 +71,7 @@ public class IndexControllerTest {
     Assert.assertEquals(IndexController.FLUSH_INTERVAL * 2 - 1, controller.getCurrentIndex());
     Assert.assertEquals(IndexController.FLUSH_INTERVAL, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, false);
+    controller = new IndexController(storageDir.getAbsolutePath(), prefix);
     Assert.assertEquals(IndexController.FLUSH_INTERVAL, controller.getCurrentIndex());
     Assert.assertEquals(IndexController.FLUSH_INTERVAL, controller.getLastFlushedIndex());
 
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
index cbf015257e..d243a80e77 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
@@ -54,7 +54,7 @@ public class SyncStatusTest {
   /** Confirm success from front to back */
   @Test
   public void sequenceTest() throws InterruptedException {
-    IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix, true);
+    IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
     SyncStatus status = new SyncStatus(controller, config);
@@ -79,7 +79,7 @@ public class SyncStatusTest {
   /** Confirm success from back to front */
   @Test
   public void reverseTest() throws InterruptedException {
-    IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix, true);
+    IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -111,7 +111,7 @@ public class SyncStatusTest {
   /** Confirm success first from front to back, then back to front */
   @Test
   public void mixedTest() throws InterruptedException {
-    IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix, true);
+    IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -153,7 +153,7 @@ public class SyncStatusTest {
   /** Test Blocking while addNextBatch */
   @Test
   public void waitTest() throws InterruptedException, ExecutionException {
-    IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix, true);
+    IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
     SyncStatus status = new SyncStatus(controller, config);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
new file mode 100644
index 0000000000..01d90b62cc
--- /dev/null
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.util;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class FakeConsensusReqReader implements ConsensusReqReader, DataSet {
+
+  private final RequestSets requestSets;
+
+  public FakeConsensusReqReader(RequestSets requestSets) {
+    this.requestSets = requestSets;
+  }
+
+  @Override
+  public IConsensusRequest getReq(long index) {
+    synchronized (requestSets) {
+      for (IndexedConsensusRequest indexedConsensusRequest : requestSets.getRequestSet()) {
+        if (indexedConsensusRequest.getSearchIndex() == index) {
+          return indexedConsensusRequest;
+        }
+      }
+      return null;
+    }
+  }
+
+  @Override
+  public List<IConsensusRequest> getReqs(long startIndex, int num) {
+    return null;
+  }
+
+  @Override
+  public ReqIterator getReqIterator(long startIndex) {
+    return new FakeConsensusReqIterator(startIndex);
+  }
+
+  @Override
+  public long getCurrentSearchIndex() {
+    return requestSets.getLocalRequestNumber();
+  }
+
+  private class FakeConsensusReqIterator implements ConsensusReqReader.ReqIterator {
+    private long nextSearchIndex;
+
+    public FakeConsensusReqIterator(long startIndex) {
+      this.nextSearchIndex = startIndex;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return true;
+    }
+
+    @Override
+    public IndexedConsensusRequest next() {
+      synchronized (requestSets) {
+        for (IndexedConsensusRequest indexedConsensusRequest : requestSets.getRequestSet()) {
+          if (indexedConsensusRequest.getSearchIndex() == nextSearchIndex) {
+            nextSearchIndex++;
+            return indexedConsensusRequest;
+          }
+        }
+        return null;
+      }
+    }
+
+    @Override
+    public void waitForNextReady() throws InterruptedException {
+      while (!hasNext()) {
+        requestSets.waitForNextReady();
+      }
+    }
+
+    @Override
+    public void waitForNextReady(long time, TimeUnit unit)
+        throws InterruptedException, TimeoutException {
+      while (!hasNext()) {
+        requestSets.waitForNextReady(time, unit);
+      }
+    }
+
+    @Override
+    public void skipTo(long targetIndex) {
+      nextSearchIndex = targetIndex;
+    }
+  }
+}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/RequestSets.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/RequestSets.java
new file mode 100644
index 0000000000..5b16573280
--- /dev/null
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/RequestSets.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.util;
+
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class RequestSets {
+  private final Set<IndexedConsensusRequest> requestSet;
+  private long localRequestNumber = 0;
+
+  public RequestSets(Set<IndexedConsensusRequest> requests) {
+    this.requestSet = requests;
+  }
+
+  public void add(IndexedConsensusRequest request, boolean local) {
+    if (local) {
+      localRequestNumber++;
+    }
+    requestSet.add(request);
+  }
+
+  public Set<IndexedConsensusRequest> getRequestSet() {
+    return requestSet;
+  }
+
+  public void waitForNextReady() throws InterruptedException {}
+
+  public boolean waitForNextReady(long time, TimeUnit unit) throws InterruptedException {
+    return true;
+  }
+
+  public long getLocalRequestNumber() {
+    return localRequestNumber;
+  }
+}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java
new file mode 100644
index 0000000000..214af5a8d6
--- /dev/null
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.util;
+
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class TestEntry implements IConsensusRequest {
+
+  private final int num;
+  private final Peer peer;
+
+  public TestEntry(int num, Peer peer) {
+    this.num = num;
+    this.peer = peer;
+  }
+
+  @Override
+  public ByteBuffer serializeToByteBuffer() {
+    try (PublicBAOS publicBAOS = new PublicBAOS();
+        DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
+      outputStream.writeInt(num);
+      peer.serialize(outputStream);
+      return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TestEntry testEntry = (TestEntry) o;
+    return num == testEntry.num && Objects.equals(peer, testEntry.peer);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(num, peer);
+  }
+
+  @Override
+  public String toString() {
+    return "TestEntry{" + "num=" + num + ", peer=" + peer + '}';
+  }
+}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
new file mode 100644
index 0000000000..a7803ba689
--- /dev/null
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.util;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
+
+  private final RequestSets requestSets = new RequestSets(ConcurrentHashMap.newKeySet());
+
+  public Set<IndexedConsensusRequest> getRequestSet() {
+    return requestSets.getRequestSet();
+  }
+
+  public Set<TestEntry> getData() {
+    Set<TestEntry> data = new HashSet<>();
+    requestSets.getRequestSet().forEach(x -> data.add((TestEntry) x.getRequest()));
+    return data;
+  }
+
+  @Override
+  public void start() {}
+
+  @Override
+  public void stop() {}
+
+  @Override
+  public TSStatus write(IConsensusRequest request) {
+    synchronized (requestSets) {
+      IConsensusRequest innerRequest = ((IndexedConsensusRequest) request).getRequest();
+      if (innerRequest instanceof ByteBufferConsensusRequest) {
+        ByteBuffer buffer = innerRequest.serializeToByteBuffer();
+        requestSets.add(
+            new IndexedConsensusRequest(
+                ((IndexedConsensusRequest) request).getSearchIndex(),
+                -1,
+                new TestEntry(buffer.getInt(), Peer.deserialize(buffer))),
+            false);
+      } else {
+        requestSets.add(((IndexedConsensusRequest) request), true);
+      }
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    }
+  }
+
+  @Override
+  public synchronized DataSet read(IConsensusRequest request) {
+    if (request instanceof GetConsensusReqReaderPlan) {
+      return new FakeConsensusReqReader(requestSets);
+    }
+    return null;
+  }
+
+  @Override
+  public boolean takeSnapshot(File snapshotDir) {
+    return false;
+  }
+
+  @Override
+  public void loadSnapshot(File latestSnapshotRootDir) {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java b/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
index f982aae06b..9c4da773e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.tools;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.db.exception.SystemCheckException;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
 import org.apache.iotdb.db.wal.utils.WALFileUtils;
 
@@ -35,6 +36,7 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -93,6 +95,12 @@ public class WalChecker {
       while (logStream.available() > 0) {
         WALEntry walEntry = WALEntry.deserialize(logStream);
         totalSize += walEntry.serializedSize();
+        if (walEntry.getValue() instanceof InsertTabletNode) {
+          InsertTabletNode insertNode = (InsertTabletNode) walEntry.getValue();
+          System.err.printf(
+              "searchIndex: %s, timestamp: %s%n",
+              insertNode.getSearchIndex(), Arrays.toString(insertNode.getTimes()));
+        }
       }
     } catch (EOFException e) {
       if (totalSize == walFile.length()) {
@@ -122,12 +130,7 @@ public class WalChecker {
 
   /** @param args walRootDirectory */
   public static void main(String[] args) throws SystemCheckException {
-    if (args.length < 1) {
-      logger.error("No enough args: require the walRootDirectory");
-      return;
-    }
-
-    WalChecker checker = new WalChecker(args[0]);
+    WalChecker checker = new WalChecker("/Users/heimingz/Desktop/0");
     List<File> files = checker.doCheck();
     report(files);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
index 51d0ff33ae..e73f053469 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
@@ -103,4 +103,9 @@ public abstract class AbstractWALBuffer implements IWALBuffer {
     currentWALFileWriter = new WALWriter(nextLogFile);
     logger.debug("Open new wal file {} for wal node-{}'s buffer.", nextLogFile, identifier);
   }
+
+  @Override
+  public long getCurrentSearchIndex() {
+    return currentSearchIndex;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
index b5a43b0cd0..ca76b89fcd 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
@@ -40,6 +40,9 @@ public interface IWALBuffer extends AutoCloseable {
   /** Get current wal file's size */
   long getCurrentWALFileSize();
 
+  /** Get current search index */
+  long getCurrentSearchIndex();
+
   @Override
   void close();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
index 507d02622a..54a1ba09a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
@@ -119,6 +119,11 @@ public class WALFakeNode implements IWALNode {
     // do nothing
   }
 
+  @Override
+  public long getCurrentSearchIndex() {
+    throw new UnsupportedOperationException();
+  }
+
   public static WALFakeNode getFailureInstance(Exception e) {
     return new WALFakeNode(
         WALFlushListener.Status.FAILURE,
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 6827a97de2..1709ee22ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -77,7 +78,6 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
-import static org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode.NO_CONSENSUS_INDEX;
 
 /**
  * This class encapsulates {@link IWALBuffer} and {@link CheckpointManager}. If search is enabled,
@@ -135,8 +135,7 @@ public class WALNode implements IWALNode {
 
   @Override
   public WALFlushListener log(long memTableId, InsertRowNode insertRowNode) {
-    if (insertRowNode.getSearchIndex() != NO_CONSENSUS_INDEX
-        && insertRowNode.getSafelyDeletedSearchIndex() != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
+    if (insertRowNode.getSafelyDeletedSearchIndex() != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
       safelyDeletedSearchIndex = insertRowNode.getSafelyDeletedSearchIndex();
     }
     WALEntry walEntry = new WALEntry(memTableId, insertRowNode);
@@ -153,8 +152,7 @@ public class WALNode implements IWALNode {
   @Override
   public WALFlushListener log(
       long memTableId, InsertTabletNode insertTabletNode, int start, int end) {
-    if (insertTabletNode.getSearchIndex() != NO_CONSENSUS_INDEX
-        && insertTabletNode.getSafelyDeletedSearchIndex() != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
+    if (insertTabletNode.getSafelyDeletedSearchIndex() != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
       safelyDeletedSearchIndex = insertTabletNode.getSafelyDeletedSearchIndex();
     }
     WALEntry walEntry = new WALEntry(memTableId, insertTabletNode, start, end);
@@ -237,7 +235,7 @@ public class WALNode implements IWALNode {
         }
       }
 
-      logger.debug(
+      logger.info(
           "Start deleting outdated wal files for wal node-{}, the first valid version id is {}, and the safely deleted search index is {}.",
           identifier,
           firstValidVersionId,
@@ -246,11 +244,6 @@ public class WALNode implements IWALNode {
       // delete outdated files
       deleteOutdatedFiles();
 
-      // wal is used to search, cannot optimize files deletion
-      if (safelyDeletedSearchIndex != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
-        return;
-      }
-
       // calculate effective information ratio
       long costOfActiveMemTables = checkpointManager.getTotalCostOfActiveMemTables();
       long costOfFlushedMemTables = totalCostOfFlushedMemTables.get();
@@ -276,6 +269,10 @@ public class WALNode implements IWALNode {
             identifier,
             config.getWalMinEffectiveInfoRatio());
         if (snapshotOrFlushMemTable() && recursionTime < MAX_RECURSION_TIME) {
+          // wal is used to search, cannot optimize files deletion
+          if (safelyDeletedSearchIndex != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
+            return;
+          }
           run();
           recursionTime++;
         }
@@ -291,8 +288,10 @@ public class WALNode implements IWALNode {
       }
       // delete files whose content's search index are all <= safelyDeletedSearchIndex
       WALFileUtils.ascSortByVersionId(filesToDelete);
+      // judge DEFAULT_SAFELY_DELETED_SEARCH_INDEX for standalone, Long.MIN_VALUE for multi-leader
       int endFileIndex =
           safelyDeletedSearchIndex == DEFAULT_SAFELY_DELETED_SEARCH_INDEX
+                  || safelyDeletedSearchIndex == Long.MIN_VALUE
               ? filesToDelete.length
               : WALFileUtils.binarySearchFileBySearchIndex(
                   filesToDelete, safelyDeletedSearchIndex + 1);
@@ -670,11 +669,6 @@ public class WALNode implements IWALNode {
     private int currentFileIndex = -1;
     /** true means filesToSearch and currentFileIndex are outdated, call updateFilesToSearch */
     private boolean needUpdatingFilesToSearch = true;
-    /**
-     * files whose version id before this value have already been searched, avoid storing too many
-     * files in filesToSearch
-     */
-    private long searchedFilesVersionId = 0;
     /** batch store insert nodes */
     private final List<InsertNode> insertNodes = new LinkedList<>();
     /** iterator of insertNodes */
@@ -698,6 +692,17 @@ public class WALNode implements IWALNode {
       if (needUpdatingFilesToSearch || filesToSearch == null) {
         updateFilesToSearch();
         if (needUpdatingFilesToSearch) {
+          logger.warn("update file to search failed");
+          return false;
+        }
+      }
+
+      // find file contains search index
+      while (WALFileUtils.parseStatusCode(filesToSearch[currentFileIndex].getName())
+          == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
+        currentFileIndex++;
+        if (currentFileIndex >= filesToSearch.length) {
+          needUpdatingFilesToSearch = true;
           return false;
         }
       }
@@ -807,9 +812,6 @@ public class WALNode implements IWALNode {
       // update file index and version id
       if (currentFileIndex >= filesToSearch.length) {
         needUpdatingFilesToSearch = true;
-      } else {
-        searchedFilesVersionId =
-            WALFileUtils.parseVersionId(filesToSearch[currentFileIndex].getName());
       }
 
       // update iterator
@@ -821,7 +823,7 @@ public class WALNode implements IWALNode {
     }
 
     @Override
-    public IConsensusRequest next() {
+    public IndexedConsensusRequest next() {
       if (itr == null && !hasNext()) {
         throw new NoSuchElementException();
       }
@@ -846,7 +848,7 @@ public class WALNode implements IWALNode {
             String.format("Search index of wal node-%s are out of order", identifier));
       }
 
-      return insertNode;
+      return new IndexedConsensusRequest(insertNode.getSearchIndex(), -1, insertNode);
     }
 
     @Override
@@ -882,7 +884,6 @@ public class WALNode implements IWALNode {
 
     /** Reset all params except nextSearchIndex */
     private void reset() {
-      searchedFilesVersionId = -1;
       insertNodes.clear();
       itr = null;
       filesToSearch = null;
@@ -891,14 +892,14 @@ public class WALNode implements IWALNode {
     }
 
     private void updateFilesToSearch() {
-      File[] filesToSearch = logDirectory.listFiles(this::filterFilesToSearch);
+      File[] filesToSearch = WALFileUtils.listAllWALFiles(logDirectory);
       WALFileUtils.ascSortByVersionId(filesToSearch);
       int fileIndex = WALFileUtils.binarySearchFileBySearchIndex(filesToSearch, nextSearchIndex);
+      logger.debug(
+          "searchIndex: {}, result: {}, files: {}, ", nextSearchIndex, fileIndex, filesToSearch);
       if (filesToSearch != null && fileIndex >= 0) { // possible to find next
         this.filesToSearch = filesToSearch;
         this.currentFileIndex = fileIndex;
-        this.searchedFilesVersionId =
-            WALFileUtils.parseVersionId(this.filesToSearch[currentFileIndex].getName());
         this.needUpdatingFilesToSearch = false;
       } else { // impossible to find next
         this.filesToSearch = null;
@@ -906,18 +907,13 @@ public class WALNode implements IWALNode {
         this.needUpdatingFilesToSearch = true;
       }
     }
+  }
 
-    private boolean filterFilesToSearch(File dir, String name) {
-      Pattern pattern = WALFileUtils.WAL_FILE_NAME_PATTERN;
-      Matcher matcher = pattern.matcher(name);
-      boolean toSearch = false;
-      if (matcher.find()) {
-        long versionId = Long.parseLong(matcher.group(IoTDBConstant.WAL_VERSION_ID));
-        toSearch = versionId >= searchedFilesVersionId;
-      }
-      return toSearch;
-    }
+  @Override
+  public long getCurrentSearchIndex() {
+    return buffer.getCurrentSearchIndex();
   }
+
   // endregion
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
index e4603b7607..3350894304 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
@@ -248,27 +248,27 @@ public class ConsensusReqReaderTest {
     ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
 
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertRowNode);
     Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
     Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
     Assert.assertEquals(
         3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertRowsNode);
     Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
     Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertMultiTabletsNode);
     Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
     Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertTabletNode);
     Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
     Assert.assertFalse(iterator.hasNext());
@@ -281,12 +281,12 @@ public class ConsensusReqReaderTest {
     ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(4);
 
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertMultiTabletsNode);
     Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
     Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertTabletNode);
     Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
 
@@ -298,7 +298,8 @@ public class ConsensusReqReaderTest {
             () -> {
               iterator.waitForNextReady();
               Assert.assertTrue(iterator.hasNext());
-              IConsensusRequest req = iterator.next();
+              IConsensusRequest req = iterator.next().getRequest();
+              ;
               Assert.assertTrue(req instanceof InsertRowNode);
               Assert.assertEquals(6, ((InsertRowNode) req).getSearchIndex());
               return true;
@@ -317,7 +318,7 @@ public class ConsensusReqReaderTest {
     ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
 
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertTabletNode);
     Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
 
@@ -329,7 +330,8 @@ public class ConsensusReqReaderTest {
             () -> {
               iterator.waitForNextReady();
               Assert.assertTrue(iterator.hasNext());
-              IConsensusRequest req = iterator.next();
+              IConsensusRequest req = iterator.next().getRequest();
+              ;
               Assert.assertTrue(req instanceof InsertRowNode);
               Assert.assertEquals(6, ((InsertRowNode) req).getSearchIndex());
               return true;
@@ -349,11 +351,11 @@ public class ConsensusReqReaderTest {
     ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
 
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertRowNode);
     Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
     Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
     Assert.assertEquals(
@@ -362,12 +364,12 @@ public class ConsensusReqReaderTest {
     iterator.skipTo(4);
 
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertMultiTabletsNode);
     Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
     Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertTabletNode);
     Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
     Assert.assertFalse(iterator.hasNext());
@@ -380,30 +382,30 @@ public class ConsensusReqReaderTest {
     ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
 
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertTabletNode);
     Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
 
     iterator.skipTo(2);
 
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
     Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
     Assert.assertEquals(
         3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertRowsNode);
     Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
     Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertMultiTabletsNode);
     Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
     Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
     Assert.assertTrue(iterator.hasNext());
-    request = iterator.next();
+    request = iterator.next().getRequest();
     Assert.assertTrue(request instanceof InsertTabletNode);
     Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
     Assert.assertFalse(iterator.hasNext());