You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/25 01:15:33 UTC

[iotdb] branch master updated: [IOTDB-3232] Implement read interface for WAL (#5993)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f8c634848 [IOTDB-3232] Implement read interface for WAL (#5993)
5f8c634848 is described below

commit 5f8c63484881fd22f1d854d9f3f48da994700d51
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Wed May 25 09:15:25 2022 +0800

    [IOTDB-3232] Implement read interface for WAL (#5993)
---
 .../iotdb/consensus/wal/ConsensusReqReader.java    |  24 +-
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |   3 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  11 +
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  15 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |   4 +-
 .../plan/node/write/InsertMultiTabletsNode.java    |  15 +-
 .../plan/planner/plan/node/write/InsertNode.java   |   6 +-
 .../planner/plan/node/write/InsertRowsNode.java    |   9 +
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |   9 +
 .../db/service/metrics/predefined/FileMetrics.java |   3 +-
 .../java/org/apache/iotdb/db/tools/WalChecker.java |  13 +-
 .../java/org/apache/iotdb/db/wal/WALManager.java   | 111 +----
 .../allocation/AbstractNodeAllocationStrategy.java |  79 ++++
 .../db/wal/allocation/FirstCreateStrategy.java     | 105 +++++
 .../NodeAllocationStrategy.java}                   |  26 +-
 .../db/wal/allocation/RoundRobinStrategy.java      | 106 +++++
 .../iotdb/db/wal/buffer/AbstractWALBuffer.java     |  11 +-
 .../org/apache/iotdb/db/wal/buffer/IWALBuffer.java |  13 +-
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  |  59 ++-
 .../iotdb/db/wal/checkpoint/CheckpointManager.java |  13 +-
 .../apache/iotdb/db/wal/io/CheckpointWriter.java   |  30 --
 .../java/org/apache/iotdb/db/wal/io/WALWriter.java |  29 --
 .../java/org/apache/iotdb/db/wal/node/WALNode.java | 434 +++++++++++++++--
 .../db/wal/recover/CheckpointRecoverUtils.java     |  12 +-
 .../iotdb/db/wal/recover/WALNodeRecoverTask.java   |  35 +-
 .../iotdb/db/wal/recover/WALRecoverManager.java    |   3 +-
 .../CheckpointFileUtils.java}                      |  50 +-
 .../apache/iotdb/db/wal/utils/WALFileUtils.java    | 134 ++++++
 .../org/apache/iotdb/db/tools/WalCheckerTest.java  |   9 +-
 .../org/apache/iotdb/db/wal/DisableWALTest.java    |   2 +-
 .../org/apache/iotdb/db/wal/WALManagerTest.java    |  32 +-
 .../FirstCreateStrategyTest.java}                  |  79 +++-
 .../RoundRobinStrategyTest.java}                   |  47 +-
 .../iotdb/db/wal/buffer/WALBufferCommonTest.java   |   4 +-
 .../db/wal/checkpoint/CheckpointManagerTest.java   |  17 +-
 .../iotdb/db/wal/node/ConsensusReqReaderTest.java  | 516 +++++++++++++++++++++
 .../org/apache/iotdb/db/wal/node/WALNodeTest.java  |  18 +-
 .../iotdb/db/wal/utils/WALFileUtilsTest.java       | 215 +++++++++
 38 files changed, 1954 insertions(+), 347 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/wal/ConsensusReqReader.java b/consensus/src/main/java/org/apache/iotdb/consensus/wal/ConsensusReqReader.java
index 1b1929c8c8..d27e9b93e5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/wal/ConsensusReqReader.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/wal/ConsensusReqReader.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /** This interface provides search interface for consensus requests via index. */
@@ -30,7 +31,7 @@ public interface ConsensusReqReader {
    * Gets the consensus request at the specified position.
    *
    * @param index index of the consensus request to return
-   * @return the consensus request at the specified position
+   * @return the consensus request at the specified position, null if the request doesn't exist
    */
   IConsensusRequest getReq(long index);
 
@@ -57,23 +58,24 @@ public interface ConsensusReqReader {
     /** Like {@link Iterator#hasNext()} */
     boolean hasNext();
 
-    /** Like {@link Iterator#next()} */
+    /**
+     * Like {@link Iterator#next()}
+     *
+     * @throws java.util.NoSuchElementException if the iteration has no more elements, wait a moment
+     *     or call {@link this#waitForNextReady} for more elements
+     */
     IConsensusRequest next();
 
     /**
-     * Returns the next element in the iteration, blocked until next element is available.
-     *
-     * @return the next element in the iteration
+     * Wait for the next element in the iteration ready, blocked until next element is available.
      */
-    IConsensusRequest waitForNext() throws InterruptedException;
+    void waitForNextReady() throws InterruptedException;
 
     /**
-     * Returns the next element in the iteration, blocked until next element is available or a
-     * specified amount of time has elapsed.
-     *
-     * @return the next element in the iteration
+     * Wait for the next element in the iteration ready, blocked until next element is available or
+     * a specified amount of time has elapsed.
      */
-    IConsensusRequest waitForNext(long timeout) throws InterruptedException, TimeoutException;
+    void waitForNextReady(long time, TimeUnit unit) throws InterruptedException, TimeoutException;
 
     /**
      * Skips to target position of next element in the iteration <br>
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 50f6e90b40..0a877d95ac 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -211,8 +211,11 @@ public class IoTDBConstant {
   public static final String COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD = "merge.mods";
 
   // write ahead log
+  public static final String WAL_FILE_PREFIX = "_";
   public static final String WAL_FILE_SUFFIX = ".wal";
   public static final String WAL_CHECKPOINT_FILE_SUFFIX = ".checkpoint";
+  public static final String WAL_VERSION_ID = "versionId";
+  public static final String WAL_START_SEARCH_INDEX = "startSearchIndex";
 
   // client version number
   public enum ClientVersion {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e345290846..cf3e35f044 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -181,6 +181,9 @@ public class IoTDBConfig {
   /** Size threshold of each wal file. Unit: byte */
   private volatile long walFileSizeThresholdInByte = 10 * 1024 * 1024;
 
+  /** Size threshold of each checkpoint file. Unit: byte */
+  private volatile long checkpointFileSizeThresholdInByte = 3 * 1024 * 1024;
+
   /** Minimum ratio of effective information in wal files */
   private volatile double walMinEffectiveInfoRatio = 0.1;
 
@@ -1463,6 +1466,14 @@ public class IoTDBConfig {
     this.walFileSizeThresholdInByte = walFileSizeThresholdInByte;
   }
 
+  public long getCheckpointFileSizeThresholdInByte() {
+    return checkpointFileSizeThresholdInByte;
+  }
+
+  public void setCheckpointFileSizeThresholdInByte(long checkpointFileSizeThresholdInByte) {
+    this.checkpointFileSizeThresholdInByte = checkpointFileSizeThresholdInByte;
+  }
+
   public double getWalMinEffectiveInfoRatio() {
     return walMinEffectiveInfoRatio;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index e897ae4856..7bb7abe855 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -82,6 +82,8 @@ import org.apache.iotdb.db.sync.sender.manager.TsFileSyncManager;
 import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
 import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
 import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.db.wal.WALManager;
+import org.apache.iotdb.db.wal.node.IWALNode;
 import org.apache.iotdb.db.wal.recover.WALRecoverManager;
 import org.apache.iotdb.db.wal.recover.file.SealedTsFileRecoverPerformer;
 import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
@@ -1437,7 +1439,7 @@ public class DataRegion {
     if (sequence) {
       tsFileProcessor =
           new TsFileProcessor(
-              logicalStorageGroupName + File.separator + dataRegionId,
+              logicalStorageGroupName + FILE_NAME_SEPARATOR + dataRegionId,
               fsFactory.getFileWithParent(filePath),
               storageGroupInfo,
               this::closeUnsealedTsFileProcessorCallBack,
@@ -1446,7 +1448,7 @@ public class DataRegion {
     } else {
       tsFileProcessor =
           new TsFileProcessor(
-              logicalStorageGroupName + File.separator + dataRegionId,
+              logicalStorageGroupName + FILE_NAME_SEPARATOR + dataRegionId,
               fsFactory.getFileWithParent(filePath),
               storageGroupInfo,
               this::closeUnsealedTsFileProcessorCallBack,
@@ -3502,6 +3504,15 @@ public class DataRegion {
     return idTable;
   }
 
+  public IWALNode getWALNode() {
+    if (!config.isClusterMode()) {
+      throw new UnsupportedOperationException();
+    }
+    // identifier should be same with getTsFileProcessor method
+    return WALManager.getInstance()
+        .applyForWALNode(logicalStorageGroupName + FILE_NAME_SEPARATOR + dataRegionId);
+  }
+
   @TestOnly
   public ILastFlushTimeManager getLastFlushTimeManager() {
     return lastFlushTimeManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 8cac805ba2..37d67e1f3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -181,7 +181,7 @@ public class TsFileProcessor {
     this.writer = new RestorableTsFileIOWriter(tsfile);
     this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
     this.sequence = sequence;
-    this.walNode = WALManager.getInstance().applyForWALNode();
+    this.walNode = WALManager.getInstance().applyForWALNode(storageGroupName);
     flushListeners.add(FlushListener.DefaultMemTableFLushListener.INSTANCE);
     flushListeners.add(this.walNode);
     closeFileListeners.add(closeTsFileCallback);
@@ -203,7 +203,7 @@ public class TsFileProcessor {
     this.writer = writer;
     this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
     this.sequence = sequence;
-    this.walNode = WALManager.getInstance().applyForWALNode();
+    this.walNode = WALManager.getInstance().applyForWALNode(storageGroupName);
     flushListeners.add(FlushListener.DefaultMemTableFLushListener.INSTANCE);
     flushListeners.add(this.walNode);
     closeFileListeners.add(closeUnsealedTsFileProcessor);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 41c37e7963..aff9170d6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -96,11 +96,20 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
     insertTabletNodeList = new ArrayList<>();
   }
 
+  public InsertMultiTabletsNode(
+      PlanNodeId id,
+      List<Integer> parentInsertTabletNodeIndexList,
+      List<InsertTabletNode> insertTabletNodeList) {
+    super(id);
+    this.parentInsertTabletNodeIndexList = parentInsertTabletNodeIndexList;
+    this.insertTabletNodeList = insertTabletNodeList;
+  }
+
   public List<Integer> getParentInsertTabletNodeIndexList() {
     return parentInsertTabletNodeIndexList;
   }
 
-  public void setParentInsertTabletNodeIndexList(List<Integer> parentInsertTabletNodeIndexList) {
+  private void setParentInsertTabletNodeIndexList(List<Integer> parentInsertTabletNodeIndexList) {
     this.parentInsertTabletNodeIndexList = parentInsertTabletNodeIndexList;
   }
 
@@ -108,7 +117,7 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
     return insertTabletNodeList;
   }
 
-  public void setInsertTabletNodeList(List<InsertTabletNode> insertTabletNodeList) {
+  private void setInsertTabletNodeList(List<InsertTabletNode> insertTabletNodeList) {
     this.insertTabletNodeList = insertTabletNodeList;
   }
 
@@ -119,11 +128,13 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
 
   @Override
   public void setSearchIndex(long index) {
+    searchIndex = index;
     insertTabletNodeList.forEach(plan -> plan.setSearchIndex(index));
   }
 
   @Override
   public void setSafelyDeletedSearchIndex(long index) {
+    safelyDeletedSearchIndex = index;
     insertTabletNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index 1fa2195dde..f5b15139e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -71,7 +71,10 @@ public abstract class InsertNode extends WritePlanNode implements IConsensusRequ
    */
   protected IDeviceID deviceID;
 
-  /** this index is used by wal search, its order should be protected by the upper layer */
+  /**
+   * this index is used by wal search, its order should be protected by the upper layer, and the
+   * value should start from 1
+   */
   protected long searchIndex = NO_CONSENSUS_INDEX;
   /**
    * this index pass info to wal, indicating that insert nodes whose search index are before this
@@ -155,6 +158,7 @@ public abstract class InsertNode extends WritePlanNode implements IConsensusRequ
     return searchIndex;
   }
 
+  /** Search index should start from 1 */
   public void setSearchIndex(long searchIndex) {
     this.searchIndex = searchIndex;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
index f417ebc578..033b343541 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
@@ -63,6 +63,13 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode {
     insertRowNodeIndexList = new ArrayList<>();
   }
 
+  public InsertRowsNode(
+      PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode> insertRowNodeList) {
+    super(id);
+    this.insertRowNodeIndexList = insertRowNodeIndexList;
+    this.insertRowNodeList = insertRowNodeList;
+  }
+
   /** record the result of insert rows */
   private Map<Integer, TSStatus> results = new HashMap<>();
 
@@ -89,11 +96,13 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode {
 
   @Override
   public void setSearchIndex(long index) {
+    searchIndex = index;
     insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
   }
 
   @Override
   public void setSafelyDeletedSearchIndex(long index) {
+    safelyDeletedSearchIndex = index;
     insertRowNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 49a6093158..bc31abf10b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -72,17 +72,26 @@ public class InsertRowsOfOneDeviceNode extends InsertNode implements BatchInsert
     insertRowNodeList = new ArrayList<>();
   }
 
+  public InsertRowsOfOneDeviceNode(
+      PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode> insertRowNodeList) {
+    super(id);
+    this.insertRowNodeIndexList = insertRowNodeIndexList;
+    this.insertRowNodeList = insertRowNodeList;
+  }
+
   public Map<Integer, TSStatus> getResults() {
     return results;
   }
 
   @Override
   public void setSearchIndex(long index) {
+    searchIndex = index;
     insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
   }
 
   @Override
   public void setSafelyDeletedSearchIndex(long index) {
+    safelyDeletedSearchIndex = index;
     insertRowNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/predefined/FileMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/predefined/FileMetrics.java
index 0cf0c19879..3e846c132e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/predefined/FileMetrics.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/predefined/FileMetrics.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.service.metrics.enums.Metric;
 import org.apache.iotdb.db.service.metrics.enums.Tag;
-import org.apache.iotdb.db.wal.node.WALNode;
 import org.apache.iotdb.metrics.MetricManager;
 import org.apache.iotdb.metrics.predefined.IMetricSet;
 import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -83,7 +82,7 @@ public class FileMetrics implements IMetricSet {
                 .mapToLong(
                     dir -> {
                       File walFolder = new File(dir);
-                      File[] walNodeFolders = walFolder.listFiles(WALNode::walNodeFolderNameFilter);
+                      File[] walNodeFolders = walFolder.listFiles(File::isDirectory);
                       for (File walNodeFolder : walNodeFolders) {
                         if (walNodeFolder.exists() && walNodeFolder.isDirectory()) {
                           return org.apache.commons.io.FileUtils.listFiles(
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java b/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
index ef50112c43..f982aae06b 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
@@ -22,8 +22,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.db.exception.SystemCheckException;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
-import org.apache.iotdb.db.wal.io.WALWriter;
-import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,13 +63,7 @@ public class WalChecker {
       throw new SystemCheckException(walFolder);
     }
 
-    File[] walNodeFolders =
-        walFolderFile.listFiles(
-            (dir, name) -> {
-              File walNodeFolder = SystemFileFactory.INSTANCE.getFile(dir, name);
-              return walNodeFolder.isDirectory()
-                  && WALNode.WAL_NODE_FOLDER_PATTERN.matcher(name).find();
-            });
+    File[] walNodeFolders = walFolderFile.listFiles(File::isDirectory);
     if (walNodeFolders == null || walNodeFolders.length == 0) {
       logger.info("No sub-directories under the given directory, check ends");
       return Collections.emptyList();
@@ -80,7 +73,7 @@ public class WalChecker {
     for (int dirIndex = 0; dirIndex < walNodeFolders.length; dirIndex++) {
       File walNodeFolder = walNodeFolders[dirIndex];
       logger.info("Checking the No.{} directory {}", dirIndex, walNodeFolder.getName());
-      File[] walFiles = walNodeFolder.listFiles(WALWriter::walFilenameFilter);
+      File[] walFiles = WALFileUtils.listAllWALFiles(walNodeFolder);
       if (walFiles == null) {
         continue;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java b/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
index 8468fc6cc1..f1c6cda092 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
@@ -26,9 +26,9 @@ import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.FolderManager;
-import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
-import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.wal.allocation.FirstCreateStrategy;
+import org.apache.iotdb.db.wal.allocation.NodeAllocationStrategy;
+import org.apache.iotdb.db.wal.allocation.RoundRobinStrategy;
 import org.apache.iotdb.db.wal.node.IWALNode;
 import org.apache.iotdb.db.wal.node.WALFakeNode;
 import org.apache.iotdb.db.wal.node.WALNode;
@@ -37,82 +37,47 @@ import org.apache.iotdb.db.wal.utils.WALMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
-/** This class is used to manage all wal nodes */
+/** This class is used to manage and allocate wal nodes */
 public class WALManager implements IService {
   private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final int MAX_WAL_NODE_NUM =
       config.getMaxWalNodesNum() > 0 ? config.getMaxWalNodesNum() : config.getWalDirs().length * 2;
 
-  /** manage wal folders */
-  private FolderManager folderManager;
-  /** protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter */
-  private final Lock nodesLock = new ReentrantLock();
-  // region these variables should be protected by nodesLock
-  /** wal nodes, the max number of wal nodes is MAX_WAL_NUM */
-  private final List<WALNode> walNodes = new ArrayList<>(MAX_WAL_NODE_NUM);
-  /** help allocate node for users */
-  private int nodeCursor = -1;
-  /** each wal node has a unique long value identifier */
-  private long nodeIdCounter = -1;
-  // endregion
+  /** manage all wal nodes and decide how to allocate them */
+  private final NodeAllocationStrategy walNodesManager;
   /** single thread to delete old .wal files */
   private ScheduledExecutorService walDeleteThread;
 
-  private WALManager() {}
+  private WALManager() {
+    if (config.isClusterMode()) {
+      walNodesManager = new FirstCreateStrategy();
+    } else {
+      walNodesManager = new RoundRobinStrategy(MAX_WAL_NODE_NUM);
+    }
+  }
 
   /** Apply for a wal node */
-  public IWALNode applyForWALNode() {
+  public IWALNode applyForWALNode(String applicantUniqueId) {
     if (config.getWalMode() == WALMode.DISABLE) {
       return WALFakeNode.getSuccessInstance();
     }
 
-    WALNode selectedNode;
-    nodesLock.lock();
-    try {
-      if (walNodes.size() < MAX_WAL_NODE_NUM) {
-        nodeIdCounter++;
-        String identifier = String.valueOf(nodeIdCounter);
-        String folder;
-        // get wal folder
-        try {
-          folder = folderManager.getNextFolder();
-        } catch (DiskSpaceInsufficientException e) {
-          logger.error("All disks of wal folders are full, change system mode to read-only.", e);
-          config.setReadOnly(true);
-          return WALFakeNode.getFailureInstance(e);
-        }
-        folder = folder + File.separator + identifier;
-        // create new wal node
-        try {
-          selectedNode = new WALNode(identifier, folder);
-        } catch (FileNotFoundException e) {
-          logger.error("Fail to create wal node", e);
-          return WALFakeNode.getFailureInstance(e);
-        }
-        walNodes.add(selectedNode);
-      } else {
-        // select next wal node by sequence order
-        nodeCursor = (nodeCursor + 1) % MAX_WAL_NODE_NUM;
-        selectedNode = walNodes.get(nodeCursor);
-      }
-    } finally {
-      nodesLock.unlock();
+    return walNodesManager.applyForWALNode(applicantUniqueId);
+  }
+
+  public void registerWALNode(String applicantUniqueId, String logDirectory) {
+    if (config.getWalMode() == WALMode.DISABLE || !config.isClusterMode()) {
+      return;
     }
-    return selectedNode;
+
+    ((FirstCreateStrategy) walNodesManager).registerWALNode(applicantUniqueId, logDirectory);
   }
 
   @Override
@@ -122,9 +87,6 @@ public class WALManager implements IService {
     }
 
     try {
-      folderManager =
-          new FolderManager(
-              Arrays.asList(config.getWalDirs()), DirectoryStrategyType.SEQUENCE_STRATEGY);
       walDeleteThread =
           IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(ThreadName.WAL_DELETE.getName());
       walDeleteThread.scheduleWithFixedDelay(
@@ -179,26 +141,11 @@ public class WALManager implements IService {
   }
 
   private void deleteOutdatedFiles() {
-    for (WALNode walNode : getNodesSnapshot()) {
+    for (WALNode walNode : walNodesManager.getNodesSnapshot()) {
       walNode.deleteOutdatedFiles();
     }
   }
 
-  private List<WALNode> getNodesSnapshot() {
-    List<WALNode> snapshot;
-    if (walNodes.size() < MAX_WAL_NODE_NUM) {
-      nodesLock.lock();
-      try {
-        snapshot = new ArrayList<>(walNodes);
-      } finally {
-        nodesLock.unlock();
-      }
-    } else {
-      snapshot = walNodes;
-    }
-    return snapshot;
-  }
-
   @Override
   public void stop() {
     if (config.getWalMode() == WALMode.DISABLE) {
@@ -226,17 +173,7 @@ public class WALManager implements IService {
 
   @TestOnly
   public void clear() {
-    nodesLock.lock();
-    try {
-      nodeCursor = -1;
-      nodeIdCounter = -1;
-      for (WALNode walNode : walNodes) {
-        walNode.close();
-      }
-      walNodes.clear();
-    } finally {
-      nodesLock.unlock();
-    }
+    walNodesManager.clear();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java
new file mode 100644
index 0000000000..876d71eeb9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.allocation;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.FolderManager;
+import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALFakeNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.Arrays;
+
+public abstract class AbstractNodeAllocationStrategy implements NodeAllocationStrategy {
+  private static final Logger logger =
+      LoggerFactory.getLogger(AbstractNodeAllocationStrategy.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  /** manage wal folders */
+  protected FolderManager folderManager;
+
+  public AbstractNodeAllocationStrategy() {
+    try {
+      folderManager =
+          new FolderManager(
+              Arrays.asList(config.getWalDirs()), DirectoryStrategyType.SEQUENCE_STRATEGY);
+    } catch (DiskSpaceInsufficientException e) {
+      logger.error("All disks of wal folders are full, change system mode to read-only.", e);
+      config.setReadOnly(true);
+    }
+  }
+
+  protected IWALNode createWALNode(String identifier) {
+    String folder;
+    // get wal folder
+    try {
+      folder = folderManager.getNextFolder();
+    } catch (DiskSpaceInsufficientException e) {
+      logger.error("All disks of wal folders are full, change system mode to read-only.", e);
+      config.setReadOnly(true);
+      return WALFakeNode.getFailureInstance(e);
+    }
+    folder = folder + File.separator + identifier;
+    // create new wal node
+    return createWALNode(identifier, folder);
+  }
+
+  protected IWALNode createWALNode(String identifier, String folder) {
+    try {
+      return new WALNode(identifier, folder);
+    } catch (FileNotFoundException e) {
+      logger.error("Fail to create wal node", e);
+      return WALFakeNode.getFailureInstance(e);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
new file mode 100644
index 0000000000..043a5797ca
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.allocation;
+
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This strategy creates one wal node for each unique identifier. In other words, each identifier
+ * (like data region) has its own wal node.
+ */
+public class FirstCreateStrategy extends AbstractNodeAllocationStrategy {
+  /** protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter */
+  private final Lock nodesLock = new ReentrantLock();
+  // region these variables should be protected by nodesLock
+  /** wal nodes */
+  private final Map<String, WALNode> identifier2Nodes = new HashMap<>();
+  // endregion
+
+  @Override
+  public IWALNode applyForWALNode(String applicantUniqueId) {
+    nodesLock.lock();
+    try {
+      if (identifier2Nodes.containsKey(applicantUniqueId)) {
+        return identifier2Nodes.get(applicantUniqueId);
+      }
+
+      IWALNode walNode = createWALNode(applicantUniqueId);
+      if (walNode instanceof WALNode) {
+        // avoid deletion
+        ((WALNode) walNode).setSafelyDeletedSearchIndex(Long.MIN_VALUE);
+        identifier2Nodes.put(applicantUniqueId, (WALNode) walNode);
+      }
+      return walNode;
+    } finally {
+      nodesLock.unlock();
+    }
+  }
+
+  public void registerWALNode(String applicantUniqueId, String logDirectory) {
+    nodesLock.lock();
+    try {
+      if (identifier2Nodes.containsKey(applicantUniqueId)) {
+        return;
+      }
+
+      IWALNode walNode = createWALNode(applicantUniqueId, logDirectory);
+      if (walNode instanceof WALNode) {
+        // avoid deletion
+        ((WALNode) walNode).setSafelyDeletedSearchIndex(Long.MIN_VALUE);
+        identifier2Nodes.put(applicantUniqueId, (WALNode) walNode);
+      }
+    } finally {
+      nodesLock.unlock();
+    }
+  }
+
+  @Override
+  public List<WALNode> getNodesSnapshot() {
+    List<WALNode> snapshot;
+    nodesLock.lock();
+    try {
+      snapshot = new ArrayList<>(identifier2Nodes.values());
+    } finally {
+      nodesLock.unlock();
+    }
+    return snapshot;
+  }
+
+  @Override
+  public void clear() {
+    nodesLock.lock();
+    try {
+      for (WALNode walNode : identifier2Nodes.values()) {
+        walNode.close();
+      }
+      identifier2Nodes.clear();
+    } finally {
+      nodesLock.unlock();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/NodeAllocationStrategy.java
similarity index 64%
copy from server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
copy to server/src/main/java/org/apache/iotdb/db/wal/allocation/NodeAllocationStrategy.java
index 9feef0963e..5ce3ca3ead 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/NodeAllocationStrategy.java
@@ -16,25 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.wal.buffer;
+package org.apache.iotdb.db.wal.allocation;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALNode;
 
-/** Currently, there are 2 buffer types, including wal rolling buffer and wal segmented buffer. */
-public interface IWALBuffer extends AutoCloseable {
-  /**
-   * Write WALEntry into wal buffer.
-   *
-   * @param walEntry info will be written into wal buffer
-   */
-  void write(WALEntry walEntry);
+import java.util.List;
 
-  /** Get current log version id */
-  int getCurrentWALFileVersion();
-
-  @Override
-  void close();
+/** This interface */
+public interface NodeAllocationStrategy {
+  /** Allocate one wal node for the applicant */
+  IWALNode applyForWALNode(String applicantUniqueId);
+  /** Get all wal nodes */
+  List<WALNode> getNodesSnapshot();
 
   @TestOnly
-  boolean isAllWALEntriesConsumed();
+  void clear();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/allocation/RoundRobinStrategy.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/RoundRobinStrategy.java
new file mode 100644
index 0000000000..8bd57491cc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/RoundRobinStrategy.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.allocation;
+
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This strategy creates n wal nodes and allocate them by round-robin strategy. In other words,
+ * several identifiers (like data regions) can share one wal node.
+ */
+public class RoundRobinStrategy extends AbstractNodeAllocationStrategy {
+  /** max wal nodes number */
+  private final int maxWalNodeNum;
+  /** protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter */
+  private final Lock nodesLock = new ReentrantLock();
+  // region these variables should be protected by nodesLock
+  /** wal nodes, the max number of wal nodes is MAX_WAL_NUM */
+  private final List<WALNode> walNodes;
+  /** help allocate node for users */
+  private int nodeCursor = -1;
+  /** each wal node has a unique long value identifier */
+  private long nodeIdCounter = -1;
+  // endregion
+
+  public RoundRobinStrategy(int maxWalNodeNum) {
+    this.maxWalNodeNum = maxWalNodeNum;
+    this.walNodes = new ArrayList<>(maxWalNodeNum);
+  }
+
+  @Override
+  public IWALNode applyForWALNode(String applicantUniqueId) {
+    WALNode selectedNode;
+    nodesLock.lock();
+    try {
+      if (walNodes.size() < maxWalNodeNum) {
+        nodeIdCounter++;
+        IWALNode node = createWALNode(String.valueOf(nodeIdCounter));
+        if (!(node instanceof WALNode)) {
+          return node;
+        }
+        selectedNode = (WALNode) node;
+        walNodes.add(selectedNode);
+      } else {
+        // select next wal node by sequence order
+        nodeCursor = (nodeCursor + 1) % maxWalNodeNum;
+        selectedNode = walNodes.get(nodeCursor);
+      }
+    } finally {
+      nodesLock.unlock();
+    }
+    return selectedNode;
+  }
+
+  @Override
+  public List<WALNode> getNodesSnapshot() {
+    List<WALNode> snapshot;
+    if (walNodes.size() < maxWalNodeNum) {
+      nodesLock.lock();
+      try {
+        snapshot = new ArrayList<>(walNodes);
+      } finally {
+        nodesLock.unlock();
+      }
+    } else {
+      snapshot = walNodes;
+    }
+    return snapshot;
+  }
+
+  @Override
+  public void clear() {
+    nodesLock.lock();
+    try {
+      nodeCursor = -1;
+      nodeIdCounter = -1;
+      for (WALNode walNode : walNodes) {
+        walNode.close();
+      }
+      walNodes.clear();
+    } finally {
+      nodesLock.unlock();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
index 4cf77c9656..127451b7d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.wal.buffer;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.db.wal.io.ILogWriter;
 import org.apache.iotdb.db.wal.io.WALWriter;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +40,8 @@ public abstract class AbstractWALBuffer implements IWALBuffer {
   protected final String logDirectory;
   /** current wal file version id */
   protected final AtomicInteger currentWALFileVersion = new AtomicInteger();
+  /** current search index */
+  protected volatile long currentSearchIndex = 0;
   /** current wal file log writer */
   protected volatile ILogWriter currentWALFileWriter;
 
@@ -52,7 +55,8 @@ public abstract class AbstractWALBuffer implements IWALBuffer {
     currentWALFileWriter =
         new WALWriter(
             SystemFileFactory.INSTANCE.getFile(
-                logDirectory, WALWriter.getLogFileName(currentWALFileVersion.get())));
+                logDirectory,
+                WALFileUtils.getLogFileName(currentWALFileVersion.get(), currentSearchIndex)));
   }
 
   @Override
@@ -61,11 +65,12 @@ public abstract class AbstractWALBuffer implements IWALBuffer {
   }
 
   /** Notice: only called by syncBufferThread and old log writer will be closed by this function. */
-  protected void rollLogWriter() throws IOException {
+  protected void rollLogWriter(long searchIndex) throws IOException {
     currentWALFileWriter.close();
     File nextLogFile =
         SystemFileFactory.INSTANCE.getFile(
-            logDirectory, WALWriter.getLogFileName(currentWALFileVersion.incrementAndGet()));
+            logDirectory,
+            WALFileUtils.getLogFileName(currentWALFileVersion.incrementAndGet(), searchIndex));
     currentWALFileWriter = new WALWriter(nextLogFile);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
index 9feef0963e..ee57e51b2a 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
@@ -20,7 +20,12 @@ package org.apache.iotdb.db.wal.buffer;
 
 import org.apache.iotdb.commons.utils.TestOnly;
 
-/** Currently, there are 2 buffer types, including wal rolling buffer and wal segmented buffer. */
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class serializes and flushes {@link WALEntry}. If search is enabled, the order of search
+ * index should be protected by the upper layer, and the value should start from 1.
+ */
 public interface IWALBuffer extends AutoCloseable {
   /**
    * Write WALEntry into wal buffer.
@@ -35,6 +40,12 @@ public interface IWALBuffer extends AutoCloseable {
   @Override
   void close();
 
+  /** Wait for next flush operation done */
+  void waitForFlush() throws InterruptedException;
+
+  /** Wait for next flush operation done */
+  boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException;
+
   @TestOnly
   boolean isAllWALEntriesConsumed();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index 27c6ca15fa..d9e9d5007a 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.utils.MmapUtil;
 import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
 import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
@@ -172,7 +173,7 @@ public class WALBuffer extends AbstractWALBuffer {
 
       // call fsync at last and set fsyncListeners
       if (batchSize > 0) {
-        fsyncWorkingBuffer(fsyncListeners, rollWALFileWriterListener);
+        fsyncWorkingBuffer(currentSearchIndex, fsyncListeners, rollWALFileWriterListener);
       }
     }
 
@@ -203,6 +204,14 @@ public class WALBuffer extends AbstractWALBuffer {
         walEntry.getWalFlushListener().fail(e);
         return false;
       }
+      // update search index
+      if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
+          || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
+        InsertNode insertNode = (InsertNode) walEntry.getValue();
+        if (insertNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) {
+          currentSearchIndex = insertNode.getSearchIndex();
+        }
+      }
       return true;
     }
 
@@ -214,12 +223,12 @@ public class WALBuffer extends AbstractWALBuffer {
       switch (signalWALEntry.getSignalType()) {
         case ROLL_WAL_LOG_WRITER_SIGNAL:
           rollWALFileWriterListener = signalWALEntry.getWalFlushListener();
-          fsyncWorkingBuffer(fsyncListeners, rollWALFileWriterListener);
+          fsyncWorkingBuffer(currentSearchIndex, fsyncListeners, rollWALFileWriterListener);
           return true;
         case CLOSE_SIGNAL:
           boolean dataExists = batchSize > 0;
           if (dataExists) {
-            fsyncWorkingBuffer(fsyncListeners, rollWALFileWriterListener);
+            fsyncWorkingBuffer(currentSearchIndex, fsyncListeners, rollWALFileWriterListener);
           }
           return dataExists;
         default:
@@ -240,7 +249,7 @@ public class WALBuffer extends AbstractWALBuffer {
     }
 
     private void rollBuffer() {
-      syncWorkingBuffer();
+      syncWorkingBuffer(currentSearchIndex);
     }
 
     @Override
@@ -304,16 +313,19 @@ public class WALBuffer extends AbstractWALBuffer {
   }
 
   /** Notice: this method only called when buffer is exhausted by SerializeTask. */
-  private void syncWorkingBuffer() {
+  private void syncWorkingBuffer(long searchIndex) {
     switchWorkingBufferToFlushing();
-    syncBufferThread.submit(new SyncBufferTask(false));
+    syncBufferThread.submit(new SyncBufferTask(searchIndex, false));
   }
 
   /** Notice: this method only called at the last of SerializeTask. */
   private void fsyncWorkingBuffer(
-      List<WALFlushListener> fsyncListeners, WALFlushListener rollWALFileWriterListener) {
+      long searchIndex,
+      List<WALFlushListener> fsyncListeners,
+      WALFlushListener rollWALFileWriterListener) {
     switchWorkingBufferToFlushing();
-    syncBufferThread.submit(new SyncBufferTask(true, fsyncListeners, rollWALFileWriterListener));
+    syncBufferThread.submit(
+        new SyncBufferTask(searchIndex, true, fsyncListeners, rollWALFileWriterListener));
   }
 
   // only called by serializeThread
@@ -341,18 +353,21 @@ public class WALBuffer extends AbstractWALBuffer {
    * This task syncs syncingBuffer to disk. The precondition is that syncingBuffer cannot be null.
    */
   private class SyncBufferTask implements Runnable {
+    private final long searchIndex;
     private final boolean forceFlag;
     private final List<WALFlushListener> fsyncListeners;
     private final WALFlushListener rollWALFileWriterListener;
 
-    public SyncBufferTask(boolean forceFlag) {
-      this(forceFlag, null, null);
+    public SyncBufferTask(long searchIndex, boolean forceFlag) {
+      this(searchIndex, forceFlag, null, null);
     }
 
     public SyncBufferTask(
+        long searchIndex,
         boolean forceFlag,
         List<WALFlushListener> fsyncListeners,
         WALFlushListener rollWALFileWriterListener) {
+      this.searchIndex = searchIndex;
       this.forceFlag = forceFlag;
       this.fsyncListeners = fsyncListeners == null ? Collections.emptyList() : fsyncListeners;
       this.rollWALFileWriterListener = rollWALFileWriterListener;
@@ -396,7 +411,7 @@ public class WALBuffer extends AbstractWALBuffer {
         if (rollWALFileWriterListener != null
             || (forceFlag
                 && currentWALFileWriter.size() >= config.getWalFileSizeThresholdInByte())) {
-          rollLogWriter();
+          rollLogWriter(searchIndex);
           if (rollWALFileWriterListener != null) {
             rollWALFileWriterListener.succeed();
           }
@@ -422,7 +437,27 @@ public class WALBuffer extends AbstractWALBuffer {
       // and there is only one buffer can be null between syncingBuffer and idleBuffer
       idleBuffer = syncingBuffer;
       syncingBuffer = null;
-      idleBufferReadyCondition.signal();
+      idleBufferReadyCondition.signalAll();
+    } finally {
+      buffersLock.unlock();
+    }
+  }
+
+  @Override
+  public void waitForFlush() throws InterruptedException {
+    buffersLock.lock();
+    try {
+      idleBufferReadyCondition.await();
+    } finally {
+      buffersLock.unlock();
+    }
+  }
+
+  @Override
+  public boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException {
+    buffersLock.lock();
+    try {
+      return idleBufferReadyCondition.await(time, unit);
     } finally {
       buffersLock.unlock();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
index 374d426a9f..ee981f70ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.wal.io.CheckpointWriter;
 import org.apache.iotdb.db.wal.io.ILogWriter;
+import org.apache.iotdb.db.wal.utils.CheckpointFileUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,9 +42,6 @@ import java.util.concurrent.locks.ReentrantLock;
 
 /** This class is used to manage checkpoints of one wal node */
 public class CheckpointManager implements AutoCloseable {
-  /** use size limit to control WALEntry number in each file */
-  public static final long LOG_SIZE_LIMIT = 3 * 1024 * 1024;
-
   private static final Logger logger = LoggerFactory.getLogger(CheckpointManager.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
@@ -77,7 +75,7 @@ public class CheckpointManager implements AutoCloseable {
     currentLogWriter =
         new CheckpointWriter(
             SystemFileFactory.INSTANCE.getFile(
-                logDirectory, CheckpointWriter.getLogFileName(currentCheckPointFileVersion)));
+                logDirectory, CheckpointFileUtils.getLogFileName(currentCheckPointFileVersion)));
     makeGlobalInfoCP();
   }
 
@@ -169,7 +167,8 @@ public class CheckpointManager implements AutoCloseable {
           currentLogWriter.force();
           File oldFile =
               SystemFileFactory.INSTANCE.getFile(
-                  logDirectory, CheckpointWriter.getLogFileName(currentCheckPointFileVersion - 1));
+                  logDirectory,
+                  CheckpointFileUtils.getLogFileName(currentCheckPointFileVersion - 1));
           oldFile.delete();
         }
       } catch (IOException e) {
@@ -185,14 +184,14 @@ public class CheckpointManager implements AutoCloseable {
   }
 
   private boolean tryRollingLogWriter() throws IOException {
-    if (currentLogWriter.size() < LOG_SIZE_LIMIT) {
+    if (currentLogWriter.size() < config.getCheckpointFileSizeThresholdInByte()) {
       return false;
     }
     currentLogWriter.close();
     currentCheckPointFileVersion++;
     File nextLogFile =
         SystemFileFactory.INSTANCE.getFile(
-            logDirectory, CheckpointWriter.getLogFileName(currentCheckPointFileVersion));
+            logDirectory, CheckpointFileUtils.getLogFileName(currentCheckPointFileVersion));
     currentLogWriter = new CheckpointWriter(nextLogFile);
     return true;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java
index 8fa34c0e6a..e07a107b3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java
@@ -18,43 +18,13 @@
  */
 package org.apache.iotdb.db.wal.io;
 
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.wal.checkpoint.Checkpoint;
 
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 /** CheckpointWriter writes the binary {@link Checkpoint} into .checkpoint file. */
 public class CheckpointWriter extends LogWriter {
-  public static final String FILE_SUFFIX = IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX;
-  public static final Pattern CHECKPOINT_FILE_NAME_PATTERN =
-      Pattern.compile("_(?<versionId>\\d+)\\.checkpoint");
-
-  /** Return true when this file is .checkpoint file */
-  public static boolean checkpointFilenameFilter(File dir, String name) {
-    return CHECKPOINT_FILE_NAME_PATTERN.matcher(name).find();
-  }
-
-  /**
-   * Parse version id from filename
-   *
-   * @return Return {@link Integer#MIN_VALUE} when this file is not .checkpoint file
-   */
-  public static int parseVersionId(String filename) {
-    Matcher matcher = CHECKPOINT_FILE_NAME_PATTERN.matcher(filename);
-    if (matcher.find()) {
-      return Integer.parseInt(matcher.group("versionId"));
-    }
-    return Integer.MIN_VALUE;
-  }
-
-  /** Get .checkpoint filename */
-  public static String getLogFileName(long version) {
-    return FILE_PREFIX + version + FILE_SUFFIX;
-  }
-
   public CheckpointWriter(File logFile) throws FileNotFoundException {
     super(logFile);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
index 34f620297f..9c9e6174b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
@@ -18,42 +18,13 @@
  */
 package org.apache.iotdb.db.wal.io;
 
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
 
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 /** WALWriter writes the binary {@link WALEntry} into .wal file. */
 public class WALWriter extends LogWriter {
-  public static final String FILE_SUFFIX = IoTDBConstant.WAL_FILE_SUFFIX;
-  public static final Pattern WAL_FILE_NAME_PATTERN = Pattern.compile("_(?<versionId>\\d+)\\.wal");
-
-  /** Return true when this file is .wal file */
-  public static boolean walFilenameFilter(File dir, String name) {
-    return WAL_FILE_NAME_PATTERN.matcher(name).find();
-  }
-
-  /**
-   * Parse version id from filename
-   *
-   * @return Return {@link Integer#MIN_VALUE} when this file is not .wal file
-   */
-  public static int parseVersionId(String filename) {
-    Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(filename);
-    if (matcher.find()) {
-      return Integer.parseInt(matcher.group("versionId"));
-    }
-    return Integer.MIN_VALUE;
-  }
-
-  /** Get .wal filename */
-  public static String getLogFileName(long version) {
-    return FILE_PREFIX + version + FILE_SUFFIX;
-  }
-
   public WALWriter(File logFile) throws FileNotFoundException {
     super(logFile);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 6412658895..7b4d9dba8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.wal.node;
 
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -30,7 +31,12 @@ import org.apache.iotdb.db.engine.flush.FlushStatus;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -39,9 +45,11 @@ import org.apache.iotdb.db.wal.buffer.IWALBuffer;
 import org.apache.iotdb.db.wal.buffer.SignalWALEntry;
 import org.apache.iotdb.db.wal.buffer.WALBuffer;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALEntryType;
 import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
 import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
-import org.apache.iotdb.db.wal.io.WALWriter;
+import org.apache.iotdb.db.wal.io.WALReader;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
 import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.utils.TsFileUtils;
@@ -51,17 +59,28 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-/** This class encapsulates {@link IWALBuffer} and {@link CheckpointManager}. */
+/**
+ * This class encapsulates {@link IWALBuffer} and {@link CheckpointManager}. If search is enabled,
+ * the order of search index should be protected by the upper layer, and the value should start from
+ * 1.
+ */
 public class WALNode implements IWALNode {
-  public static final Pattern WAL_NODE_FOLDER_PATTERN = Pattern.compile("(?<nodeIdentifier>\\d+)");
+  public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX =
+      InsertNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
 
   private static final Logger logger = LoggerFactory.getLogger(WALNode.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -69,7 +88,7 @@ public class WALNode implements IWALNode {
   /** unique identifier of this WALNode */
   private final String identifier;
   /** directory to store this node's files */
-  private final String logDirectory;
+  private final File logDirectory;
   /** wal buffer */
   private final IWALBuffer buffer;
   /** manage checkpoints */
@@ -86,23 +105,19 @@ public class WALNode implements IWALNode {
   private final AtomicLong totalCostOfFlushedMemTables = new AtomicLong();
   /** version id -> cost sum of memTables flushed at this file version */
   private final Map<Integer, Long> walFileVersionId2MemTablesTotalCost = new ConcurrentHashMap<>();
+  /** insert nodes whose search index are before this value can be deleted safely */
+  private volatile long safelyDeletedSearchIndex = DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
 
   public WALNode(String identifier, String logDirectory) throws FileNotFoundException {
     this.identifier = identifier;
-    this.logDirectory = logDirectory;
-    File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
-    if (!logDirFile.exists() && logDirFile.mkdirs()) {
+    this.logDirectory = SystemFileFactory.INSTANCE.getFile(logDirectory);
+    if (!this.logDirectory.exists() && this.logDirectory.mkdirs()) {
       logger.info("create folder {} for wal node-{}.", logDirectory, identifier);
     }
     this.buffer = new WALBuffer(identifier, logDirectory);
     this.checkpointManager = new CheckpointManager(identifier, logDirectory);
   }
 
-  /** Return true when this folder wal node folder */
-  public static boolean walNodeFolderNameFilter(File dir, String name) {
-    return WAL_NODE_FOLDER_PATTERN.matcher(name).find();
-  }
-
   @Override
   public WALFlushListener log(int memTableId, InsertRowPlan insertRowPlan) {
     WALEntry walEntry = new WALEntry(memTableId, insertRowPlan);
@@ -111,6 +126,9 @@ public class WALNode implements IWALNode {
 
   @Override
   public WALFlushListener log(int memTableId, InsertRowNode insertRowNode) {
+    if (insertRowNode.getSafelyDeletedSearchIndex() != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
+      safelyDeletedSearchIndex = insertRowNode.getSafelyDeletedSearchIndex();
+    }
     WALEntry walEntry = new WALEntry(memTableId, insertRowNode);
     return log(walEntry);
   }
@@ -125,6 +143,9 @@ public class WALNode implements IWALNode {
   @Override
   public WALFlushListener log(
       int memTableId, InsertTabletNode insertTabletNode, int start, int end) {
+    if (insertTabletNode.getSafelyDeletedSearchIndex() != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
+      safelyDeletedSearchIndex = insertTabletNode.getSafelyDeletedSearchIndex();
+    }
     WALEntry walEntry = new WALEntry(memTableId, insertTabletNode, start, end);
     return log(walEntry);
   }
@@ -192,15 +213,7 @@ public class WALNode implements IWALNode {
       firstValidVersionId = checkpointManager.getFirstValidWALVersionId();
       if (firstValidVersionId == Integer.MIN_VALUE) {
         // roll wal log writer to delete current wal file
-        WALEntry rollWALFileSignal =
-            new SignalWALEntry(SignalWALEntry.SignalType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
-        WALFlushListener walFlushListener = log(rollWALFileSignal);
-        if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
-          logger.error(
-              "Fail to trigger rolling wal node-{}'s wal file log writer.",
-              identifier,
-              walFlushListener.getCause());
-        }
+        rollWALFile();
         // update firstValidVersionId
         firstValidVersionId = checkpointManager.getFirstValidWALVersionId();
         if (firstValidVersionId == Integer.MIN_VALUE) {
@@ -211,6 +224,11 @@ public class WALNode implements IWALNode {
       // delete outdated files
       deleteOutdatedFiles();
 
+      // wal is used to search, cannot optimize files deletion
+      if (safelyDeletedSearchIndex != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
+        return;
+      }
+
       // calculate effective information ratio
       long costOfActiveMemTables = checkpointManager.getTotalCostOfActiveMemTables();
       long costOfFlushedMemTables = totalCostOfFlushedMemTables.get();
@@ -236,15 +254,14 @@ public class WALNode implements IWALNode {
     }
 
     private void deleteOutdatedFiles() {
-      File directory = SystemFileFactory.INSTANCE.getFile(logDirectory);
-      File[] filesToDelete = directory.listFiles(this::filterFilesToDelete);
+      File[] filesToDelete = logDirectory.listFiles(this::filterFilesToDelete);
       if (filesToDelete != null) {
         for (File file : filesToDelete) {
           if (!file.delete()) {
             logger.info("Fail to delete outdated wal file {} of wal node-{}.", file, identifier);
           }
           // update totalRamCostOfFlushedMemTables
-          int versionId = WALWriter.parseVersionId(file.getName());
+          int versionId = WALFileUtils.parseVersionId(file.getName());
           Long memTableRamCostSum = walFileVersionId2MemTablesTotalCost.remove(versionId);
           if (memTableRamCostSum != null) {
             totalCostOfFlushedMemTables.addAndGet(-memTableRamCostSum);
@@ -254,12 +271,13 @@ public class WALNode implements IWALNode {
     }
 
     private boolean filterFilesToDelete(File dir, String name) {
-      Pattern pattern = WALWriter.WAL_FILE_NAME_PATTERN;
+      Pattern pattern = WALFileUtils.WAL_FILE_NAME_PATTERN;
       Matcher matcher = pattern.matcher(name);
       boolean toDelete = false;
       if (matcher.find()) {
-        int versionId = Integer.parseInt(matcher.group("versionId"));
-        toDelete = versionId < firstValidVersionId;
+        int versionId = Integer.parseInt(matcher.group(IoTDBConstant.WAL_VERSION_ID));
+        long startSearchIndex = Long.parseLong(matcher.group(IoTDBConstant.WAL_START_SEARCH_INDEX));
+        toDelete = versionId < firstValidVersionId && startSearchIndex < safelyDeletedSearchIndex;
       }
       return toDelete;
     }
@@ -377,45 +395,378 @@ public class WALNode implements IWALNode {
   // endregion
 
   // region Search interfaces for consensus group
+  public void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex) {
+    this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
+  }
+
+  /**
+   * Merge insert nodes sharing same search index ( e.g. tablet-100, tablet-100, tablet-100 will be
+   * merged to one multi-tablet). <br>
+   * Notice: the continuity of insert nodes sharing same search index should be protected by the
+   * upper layer.
+   */
+  private static InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
+    int size = insertNodes.size();
+    if (size == 0) {
+      return null;
+    }
+    if (size == 1) {
+      return insertNodes.get(0);
+    }
+
+    InsertNode result;
+    if (insertNodes.get(0) instanceof InsertTabletNode) { // merge to InsertMultiTabletsNode
+      List<Integer> index = new ArrayList<>(size);
+      List<InsertTabletNode> insertTabletNodes = new ArrayList<>(size);
+      int i = 0;
+      for (InsertNode insertNode : insertNodes) {
+        insertTabletNodes.add((InsertTabletNode) insertNode);
+        index.add(i);
+        i++;
+      }
+      result = new InsertMultiTabletsNode(new PlanNodeId(""), index, insertTabletNodes);
+    } else { // merge to InsertRowsNode or InsertRowsOfOneDeviceNode
+      boolean sameDevice = true;
+      PartialPath device = insertNodes.get(0).getDevicePath();
+      List<Integer> index = new ArrayList<>(size);
+      List<InsertRowNode> insertRowNodes = new ArrayList<>(size);
+      int i = 0;
+      for (InsertNode insertNode : insertNodes) {
+        if (sameDevice && !insertNode.getDevicePath().equals(device)) {
+          sameDevice = false;
+        }
+        insertRowNodes.add((InsertRowNode) insertNode);
+        index.add(i);
+        i++;
+      }
+      result =
+          sameDevice
+              ? new InsertRowsOfOneDeviceNode(new PlanNodeId(""), index, insertRowNodes)
+              : new InsertRowsNode(new PlanNodeId(""), index, insertRowNodes);
+    }
+    result.setSearchIndex(insertNodes.get(0).getSearchIndex());
+
+    return result;
+  }
+
   @Override
   public IConsensusRequest getReq(long index) {
+    // find file
+    File[] currentFiles = WALFileUtils.listAllWALFiles(logDirectory);
+    WALFileUtils.ascSortByVersionId(currentFiles);
+    int fileIndex = WALFileUtils.binarySearchFileBySearchIndex(currentFiles, index);
+    if (fileIndex < 0) {
+      return null;
+    }
+    // find log
+    List<InsertNode> tmpNodes = new ArrayList<>();
+    for (int i = fileIndex; i < currentFiles.length; i++) {
+      // cannot find anymore
+      if (index < WALFileUtils.parseStartSearchIndex(currentFiles[i].getName())) {
+        if (!tmpNodes.isEmpty()) {
+          return mergeInsertNodes(tmpNodes);
+        } else {
+          break;
+        }
+      }
+
+      try (WALReader walReader = new WALReader(currentFiles[i])) {
+        while (walReader.hasNext()) {
+          WALEntry walEntry = walReader.next();
+          if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
+              || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
+            InsertNode insertNode = (InsertNode) walEntry.getValue();
+            if (insertNode.getSearchIndex() == index) {
+              tmpNodes.add(insertNode);
+            } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+              return mergeInsertNodes(tmpNodes);
+            }
+          } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+            return mergeInsertNodes(tmpNodes);
+          }
+        }
+      } catch (Exception e) {
+        logger.error("Fail to read wal from wal file {}", currentFiles[i], e);
+      }
+    }
+    // not find or not complete
     return null;
   }
 
   @Override
   public List<IConsensusRequest> getReqs(long startIndex, int num) {
-    return null;
+    List<IConsensusRequest> result = new ArrayList<>(num);
+    // find file
+    File[] currentFiles = WALFileUtils.listAllWALFiles(logDirectory);
+    WALFileUtils.ascSortByVersionId(currentFiles);
+    int fileIndex = WALFileUtils.binarySearchFileBySearchIndex(currentFiles, startIndex);
+    if (fileIndex < 0) {
+      return result;
+    }
+    // find logs
+    long endIndex = startIndex + num - 1;
+    long targetIndex = startIndex;
+    List<InsertNode> tmpNodes = new ArrayList<>();
+    for (int i = fileIndex; i < currentFiles.length; i++) {
+      // cannot find anymore
+      if (endIndex < WALFileUtils.parseStartSearchIndex(currentFiles[i].getName())) {
+        if (!tmpNodes.isEmpty()) {
+          result.add(mergeInsertNodes(tmpNodes));
+        } else {
+          break;
+        }
+      }
+
+      try (WALReader walReader = new WALReader(currentFiles[i])) {
+        while (walReader.hasNext()) {
+          WALEntry walEntry = walReader.next();
+          if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
+              || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
+            InsertNode insertNode = (InsertNode) walEntry.getValue();
+            if (insertNode.getSearchIndex() == targetIndex) {
+              tmpNodes.add(insertNode);
+            } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+              result.add(mergeInsertNodes(tmpNodes));
+              if (result.size() == num) {
+                return result;
+              }
+              targetIndex++;
+              tmpNodes = new ArrayList<>();
+              // remember to add current insert node
+              if (insertNode.getSearchIndex() == targetIndex) {
+                tmpNodes.add(insertNode);
+              }
+            }
+          } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+            result.add(mergeInsertNodes(tmpNodes));
+            if (result.size() == num) {
+              return result;
+            }
+            targetIndex++;
+            tmpNodes = new ArrayList<>();
+          }
+        }
+      } catch (Exception e) {
+        logger.error("Fail to read wal from wal file {}", currentFiles[i], e);
+      }
+    }
+
+    return result;
   }
 
+  /** This iterator is not concurrency-safe */
   @Override
   public ReqIterator getReqIterator(long startIndex) {
-    return new PlanNodeIterator();
+    return new PlanNodeIterator(startIndex);
   }
 
   private class PlanNodeIterator implements ReqIterator {
+    /** search index of next element */
+    private long nextSearchIndex;
+    /** files to search */
+    private File[] filesToSearch = null;
+    /** index of current searching file in the filesToSearch */
+    private int currentFileIndex = -1;
+    /** true means filesToSearch and currentFileIndex are outdated, call updateFilesToSearch */
+    private boolean needUpdatingFilesToSearch = true;
+    /**
+     * files whose version id before this value have already been searched, avoid storing too many
+     * files in filesToSearch
+     */
+    private int searchedFilesVersionId = 0;
+    /** batch store insert nodes */
+    private final List<InsertNode> insertNodes = new LinkedList<>();
+    /** iterator of insertNodes */
+    private Iterator<InsertNode> itr = null;
+
+    public PlanNodeIterator(long startIndex) {
+      this.nextSearchIndex = startIndex;
+    }
+
     @Override
     public boolean hasNext() {
+      if (itr != null && itr.hasNext()) {
+        return true;
+      }
+
+      insertNodes.clear();
+      itr = null;
+
+      if (needUpdatingFilesToSearch || filesToSearch == null) {
+        updateFilesToSearch();
+        if (needUpdatingFilesToSearch) {
+          return false;
+        }
+      }
+
+      // find all insert plan of current wal file
+      List<InsertNode> tmpNodes = new ArrayList<>();
+      long targetIndex = nextSearchIndex;
+      try (WALReader walReader = new WALReader(filesToSearch[currentFileIndex])) {
+        while (walReader.hasNext()) {
+          WALEntry walEntry = walReader.next();
+          if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
+              || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
+            InsertNode insertNode = (InsertNode) walEntry.getValue();
+            if (insertNode.getSearchIndex() == targetIndex) {
+              tmpNodes.add(insertNode);
+            } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+              insertNodes.add(mergeInsertNodes(tmpNodes));
+              targetIndex++;
+              tmpNodes = new ArrayList<>();
+              // remember to add current insert node
+              if (insertNode.getSearchIndex() == targetIndex) {
+                tmpNodes.add(insertNode);
+              }
+            }
+          } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+            insertNodes.add(mergeInsertNodes(tmpNodes));
+            targetIndex++;
+            tmpNodes = new ArrayList<>();
+          }
+        }
+      } catch (Exception e) {
+        logger.error("Fail to read wal from wal file {}", filesToSearch[currentFileIndex], e);
+      }
+
+      // find remaining slices of last insert plan of targetIndex
+      if (tmpNodes.isEmpty()) { // all insert plans scanned
+        currentFileIndex++;
+      } else {
+        int fileIndex = currentFileIndex + 1;
+        while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length) {
+          try (WALReader walReader = new WALReader(filesToSearch[fileIndex])) {
+            while (walReader.hasNext()) {
+              WALEntry walEntry = walReader.next();
+              if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
+                  || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
+                InsertNode insertNode = (InsertNode) walEntry.getValue();
+                if (insertNode.getSearchIndex() == targetIndex) {
+                  tmpNodes.add(insertNode);
+                } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+                  insertNodes.add(mergeInsertNodes(tmpNodes));
+                  tmpNodes = Collections.emptyList();
+                  break;
+                }
+              } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+                insertNodes.add(mergeInsertNodes(tmpNodes));
+                tmpNodes = Collections.emptyList();
+                break;
+              }
+            }
+          } catch (Exception e) {
+            logger.error("Fail to read wal from wal file {}", filesToSearch[currentFileIndex], e);
+          }
+          if (!tmpNodes.isEmpty()) {
+            fileIndex++;
+          }
+        }
+
+        if (tmpNodes.isEmpty()) { // all insert plans scanned
+          currentFileIndex = fileIndex;
+        } else {
+          needUpdatingFilesToSearch = true;
+        }
+      }
+
+      // update file index and version id
+      if (currentFileIndex >= filesToSearch.length) {
+        needUpdatingFilesToSearch = true;
+      } else {
+        searchedFilesVersionId =
+            WALFileUtils.parseVersionId(filesToSearch[currentFileIndex].getName());
+      }
+
+      // update iterator
+      if (insertNodes.size() != 0) {
+        itr = insertNodes.iterator();
+        return true;
+      }
       return false;
     }
 
     @Override
     public IConsensusRequest next() {
-      return null;
+      if (itr == null && !hasNext()) {
+        throw new NoSuchElementException();
+      }
+
+      InsertNode insertNode = itr.next();
+      if (insertNode.getSearchIndex() != nextSearchIndex) {
+        logger.warn(
+            "Search index of wal node-{} are not continuously, skip from {} to {}.",
+            identifier,
+            nextSearchIndex,
+            insertNode.getSearchIndex());
+      }
+      nextSearchIndex = insertNode.getSearchIndex() + 1;
+
+      return insertNode;
     }
 
     @Override
-    public IConsensusRequest waitForNext() throws InterruptedException {
-      return null;
+    public void waitForNextReady() throws InterruptedException {
+      while (!hasNext()) {
+        buffer.waitForFlush();
+      }
     }
 
     @Override
-    public IConsensusRequest waitForNext(long timeout)
+    public void waitForNextReady(long time, TimeUnit unit)
         throws InterruptedException, TimeoutException {
-      return null;
+      if (!hasNext()) {
+        boolean timeout = !buffer.waitForFlush(time, unit);
+        if (timeout || !hasNext()) {
+          throw new TimeoutException();
+        }
+      }
     }
 
     @Override
-    public void skipTo(long targetIndex) {}
+    public void skipTo(long targetIndex) {
+      if (targetIndex < nextSearchIndex) {
+        logger.warn(
+            "Skip from {} to {}, it's a dangerous operation because insert plan {} may have been lost.",
+            nextSearchIndex,
+            targetIndex,
+            targetIndex);
+        searchedFilesVersionId = -1;
+        insertNodes.clear();
+        itr = null;
+      }
+      nextSearchIndex = targetIndex;
+      this.filesToSearch = null;
+      this.currentFileIndex = -1;
+      needUpdatingFilesToSearch = true;
+    }
+
+    private void updateFilesToSearch() {
+      File[] filesToSearch = logDirectory.listFiles(this::filterFilesToSearch);
+      WALFileUtils.ascSortByVersionId(filesToSearch);
+      int fileIndex = WALFileUtils.binarySearchFileBySearchIndex(filesToSearch, nextSearchIndex);
+      if (filesToSearch != null && fileIndex >= 0) { // possible to find next
+        this.filesToSearch = filesToSearch;
+        this.currentFileIndex = fileIndex;
+        this.searchedFilesVersionId =
+            WALFileUtils.parseVersionId(this.filesToSearch[currentFileIndex].getName());
+        this.needUpdatingFilesToSearch = false;
+      } else { // impossible to find next
+        this.filesToSearch = null;
+        this.currentFileIndex = -1;
+        this.needUpdatingFilesToSearch = true;
+      }
+    }
+
+    private boolean filterFilesToSearch(File dir, String name) {
+      Pattern pattern = WALFileUtils.WAL_FILE_NAME_PATTERN;
+      Matcher matcher = pattern.matcher(name);
+      boolean toSearch = false;
+      if (matcher.find()) {
+        int versionId = Integer.parseInt(matcher.group(IoTDBConstant.WAL_VERSION_ID));
+        toSearch = versionId >= searchedFilesVersionId;
+      }
+      return toSearch;
+    }
   }
   // endregion
 
@@ -434,4 +785,17 @@ public class WALNode implements IWALNode {
   int getCurrentLogVersion() {
     return buffer.getCurrentWALFileVersion();
   }
+
+  @TestOnly
+  public void rollWALFile() {
+    WALEntry rollWALFileSignal =
+        new SignalWALEntry(SignalWALEntry.SignalType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
+    WALFlushListener walFlushListener = log(rollWALFileSignal);
+    if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
+      logger.error(
+          "Fail to trigger rolling wal node-{}'s wal file log writer.",
+          identifier,
+          walFlushListener.getCause());
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/CheckpointRecoverUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/CheckpointRecoverUtils.java
index 20d63c1e79..dbfe1934e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/CheckpointRecoverUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/CheckpointRecoverUtils.java
@@ -21,12 +21,10 @@ package org.apache.iotdb.db.wal.recover;
 import org.apache.iotdb.db.wal.checkpoint.Checkpoint;
 import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
 import org.apache.iotdb.db.wal.io.CheckpointReader;
-import org.apache.iotdb.db.wal.io.CheckpointWriter;
+import org.apache.iotdb.db.wal.utils.CheckpointFileUtils;
 
 import java.io.File;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,14 +35,12 @@ public class CheckpointRecoverUtils {
   /** Recover memTable information from checkpoint folder */
   public static Map<Integer, MemTableInfo> recoverMemTableInfo(File logDirectory) {
     // find all .checkpoint file
-    File[] checkpointFiles = logDirectory.listFiles(CheckpointWriter::checkpointFilenameFilter);
+    File[] checkpointFiles = CheckpointFileUtils.listAllCheckpointFiles(logDirectory);
     if (checkpointFiles == null) {
       return Collections.emptyMap();
     }
-    Arrays.sort(
-        checkpointFiles,
-        Comparator.comparingInt(file -> CheckpointWriter.parseVersionId(((File) file).getName()))
-            .reversed());
+    // desc sort by version id
+    CheckpointFileUtils.descSortByVersionId(checkpointFiles);
     // find last valid .checkpoint file and load checkpoints from it
     List<Checkpoint> checkpoints = null;
     for (File checkpointFile : checkpointFiles) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
index 477e93a408..c824f00d25 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
@@ -19,18 +19,19 @@
 package org.apache.iotdb.db.wal.recover;
 
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
 import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
 import org.apache.iotdb.db.wal.io.WALReader;
-import org.apache.iotdb.db.wal.io.WALWriter;
 import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.Arrays;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -38,6 +39,7 @@ import java.util.concurrent.CountDownLatch;
 /** This task is responsible for the recovery of one wal node. */
 public class WALNodeRecoverTask implements Runnable {
   private static final Logger logger = LoggerFactory.getLogger(WALNodeRecoverTask.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final WALRecoverManager walRecoverManger = WALRecoverManager.getInstance();
 
   /** this directory store one wal node's .wal and .checkpoint files */
@@ -77,11 +79,20 @@ public class WALNodeRecoverTask implements Runnable {
         }
       }
     }
-    // delete this wal node folder
-    FileUtils.deleteDirectory(logDirectory);
-    logger.info(
-        "Successfully recover WAL node in the directory {}, so delete these wal files.",
-        logDirectory);
+
+    if (!config.isClusterMode()) {
+      // delete this wal node folder
+      FileUtils.deleteDirectory(logDirectory);
+      logger.info(
+          "Successfully recover WAL node in the directory {}, so delete these wal files.",
+          logDirectory);
+    } else {
+      WALManager.getInstance()
+          .registerWALNode(logDirectory.getName(), logDirectory.getAbsolutePath());
+      logger.info(
+          "Successfully recover WAL node in the directory {}, add this node to WALManger.",
+          logDirectory);
+    }
   }
 
   private void recoverInfoFromCheckpoints() {
@@ -116,12 +127,14 @@ public class WALNodeRecoverTask implements Runnable {
     // find all valid .wal files
     File[] walFiles =
         logDirectory.listFiles(
-            (dir, name) -> WALWriter.parseVersionId(name) >= firstValidVersionId);
+            (dir, name) ->
+                WALFileUtils.walFilenameFilter(dir, name)
+                    && WALFileUtils.parseVersionId(name) >= firstValidVersionId);
     if (walFiles == null) {
       return;
     }
-    Arrays.sort(
-        walFiles, Comparator.comparingInt(file -> WALWriter.parseVersionId(file.getName())));
+    // asc sort by version id
+    WALFileUtils.ascSortByVersionId(walFiles);
     // read .wal files and redo logs
     for (File walFile : walFiles) {
       try (WALReader walReader = new WALReader(walFile)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
index 3fd638225a..fff680ee69 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.wal.exception.WALRecoverException;
-import org.apache.iotdb.db.wal.node.WALNode;
 import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
 import org.apache.iotdb.db.wal.utils.listener.WALRecoverListener;
 
@@ -64,7 +63,7 @@ public class WALRecoverManager {
       List<File> walNodeDirs = new ArrayList<>();
       for (String walDir : config.getWalDirs()) {
         File walDirFile = SystemFileFactory.INSTANCE.getFile(walDir);
-        File[] nodeDirs = walDirFile.listFiles(WALNode::walNodeFolderNameFilter);
+        File[] nodeDirs = walDirFile.listFiles(File::isDirectory);
         if (nodeDirs == null) {
           continue;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/CheckpointFileUtils.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java
copy to server/src/main/java/org/apache/iotdb/db/wal/utils/CheckpointFileUtils.java
index 8fa34c0e6a..b8123354de 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/CheckpointFileUtils.java
@@ -16,46 +16,54 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.wal.io;
-
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.db.wal.checkpoint.Checkpoint;
+package org.apache.iotdb.db.wal.utils;
 
 import java.io.File;
-import java.io.FileNotFoundException;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-/** CheckpointWriter writes the binary {@link Checkpoint} into .checkpoint file. */
-public class CheckpointWriter extends LogWriter {
-  public static final String FILE_SUFFIX = IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.WAL_FILE_PREFIX;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.WAL_VERSION_ID;
+
+public class CheckpointFileUtils {
+  /**
+   * versionId is a self-incremented id number, helping to maintain the order of checkpoint files
+   */
   public static final Pattern CHECKPOINT_FILE_NAME_PATTERN =
-      Pattern.compile("_(?<versionId>\\d+)\\.checkpoint");
+      Pattern.compile(
+          WAL_FILE_PREFIX + "(?<" + WAL_VERSION_ID + ">\\d+)\\" + WAL_CHECKPOINT_FILE_SUFFIX);
 
   /** Return true when this file is .checkpoint file */
   public static boolean checkpointFilenameFilter(File dir, String name) {
     return CHECKPOINT_FILE_NAME_PATTERN.matcher(name).find();
   }
 
-  /**
-   * Parse version id from filename
-   *
-   * @return Return {@link Integer#MIN_VALUE} when this file is not .checkpoint file
-   */
+  /** List all .checkpoint files in the directory */
+  public static File[] listAllCheckpointFiles(File dir) {
+    return dir.listFiles(CheckpointFileUtils::checkpointFilenameFilter);
+  }
+
+  /** Parse version id from filename */
   public static int parseVersionId(String filename) {
     Matcher matcher = CHECKPOINT_FILE_NAME_PATTERN.matcher(filename);
     if (matcher.find()) {
-      return Integer.parseInt(matcher.group("versionId"));
+      return Integer.parseInt(matcher.group(WAL_VERSION_ID));
     }
-    return Integer.MIN_VALUE;
+    throw new RuntimeException("Invalid checkpoint file name: " + filename);
   }
 
-  /** Get .checkpoint filename */
-  public static String getLogFileName(long version) {
-    return FILE_PREFIX + version + FILE_SUFFIX;
+  /** Sort checkpoint files by version id with descending order * */
+  public static void descSortByVersionId(File[] checkpointFiles) {
+    Arrays.sort(
+        checkpointFiles,
+        Comparator.comparingInt(file -> parseVersionId(((File) file).getName())).reversed());
   }
 
-  public CheckpointWriter(File logFile) throws FileNotFoundException {
-    super(logFile);
+  /** Get .checkpoint filename */
+  public static String getLogFileName(long version) {
+    return WAL_FILE_PREFIX + version + WAL_CHECKPOINT_FILE_SUFFIX;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
new file mode 100644
index 0000000000..a67d323223
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.WAL_FILE_PREFIX;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.WAL_FILE_SUFFIX;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.WAL_START_SEARCH_INDEX;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.WAL_VERSION_ID;
+
+public class WALFileUtils {
+  /**
+   * versionId is a self-incremented id number, helping to maintain the order of wal files.
+   * startSearchIndex is the valid search index of last flushed wal entry. For example: <br>
+   * &nbsp _0-0.wal: 1, 2, 3, -1, -1, 4, 5, -1 <br>
+   * &nbsp _1-5.wal: -1, -1, -1, -1 <br>
+   * &nbsp _2-5.wal: 6, 7, 8, 9, -1, -1, -1, 10, 11, -1, 12, 12 <br>
+   * &nbsp _3-12.wal: 12, 12, 12, 12, 12 <br>
+   * &nbsp _4-12.wal: 12, 13, 14, 15, 16, -1 <br>
+   */
+  public static final Pattern WAL_FILE_NAME_PATTERN =
+      Pattern.compile(
+          WAL_FILE_PREFIX
+              + "(?<"
+              + WAL_VERSION_ID
+              + ">\\d+)-(?<"
+              + WAL_START_SEARCH_INDEX
+              + ">\\d+)\\"
+              + WAL_FILE_SUFFIX);
+
+  /** Return true when this file is .wal file */
+  public static boolean walFilenameFilter(File dir, String name) {
+    return WAL_FILE_NAME_PATTERN.matcher(name).find();
+  }
+
+  /** List all .wal files in the directory */
+  public static File[] listAllWALFiles(File dir) {
+    return dir.listFiles(WALFileUtils::walFilenameFilter);
+  }
+
+  /** Parse version id from filename */
+  public static int parseVersionId(String filename) {
+    Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(filename);
+    if (matcher.find()) {
+      return Integer.parseInt(matcher.group(WAL_VERSION_ID));
+    }
+    throw new RuntimeException("Invalid wal file name: " + filename);
+  }
+
+  /** Parse start search index from filename */
+  public static long parseStartSearchIndex(String filename) {
+    Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(filename);
+    if (matcher.find()) {
+      return Long.parseLong(matcher.group(WAL_START_SEARCH_INDEX));
+    }
+    throw new RuntimeException("Invalid wal file name: " + filename);
+  }
+
+  /** Sort wal files by version id with ascending order */
+  public static void ascSortByVersionId(File[] walFiles) {
+    Arrays.sort(walFiles, Comparator.comparingInt(file -> parseVersionId(file.getName())));
+  }
+
+  /**
+   * Find index of the file which probably contains target insert plan. <br>
+   * Given wal files [ _0-0.wal, _1-5.wal, _2-5.wal, _3-12.wal, _4-12.wal ], details as below: <br>
+   * &nbsp _0-0.wal: 1, 2, 3, -1, -1, 4, 5, -1 <br>
+   * &nbsp _1-5.wal: -1, -1, -1, -1 <br>
+   * &nbsp _2-5.wal: 6, 7, 8, 9, -1, -1, -1, 10, 11, -1, 12, 12 <br>
+   * &nbsp _3-12.wal: 12, 12, 12, 12, 12 <br>
+   * &nbsp _4-12.wal: 12, 13, 14, 15, 16, -1 <br>
+   * searching [1, 5] will return 0, searching [6, 12] will return 1, search [13, infinity) will
+   * return 3, others will return -1
+   *
+   * @param files files to be searched
+   * @param targetSearchIndex search index of target insert plan
+   * @return index of the file which probably contains target insert plan , -1 if the target insert
+   *     plan definitely doesn't exist
+   */
+  public static int binarySearchFileBySearchIndex(File[] files, long targetSearchIndex) {
+    if (files == null
+        || files.length == 0
+        || targetSearchIndex <= parseStartSearchIndex(files[0].getName())) {
+      return -1;
+    }
+
+    if (targetSearchIndex > parseStartSearchIndex(files[files.length - 1].getName())) {
+      return files.length - 1;
+    }
+
+    int low = 0;
+    int high = files.length - 1;
+    // search file whose search index i < targetSearchIndex
+    while (low <= high) {
+      int mid = (low + high) >>> 1;
+      long midVal = parseStartSearchIndex(files[mid].getName());
+
+      if (midVal < targetSearchIndex) {
+        low = mid + 1;
+      } else {
+        high = mid - 1;
+      }
+    }
+
+    return low - 1;
+  }
+
+  /** Get .wal filename */
+  public static String getLogFileName(int versionId, long startSearchIndex) {
+    return WAL_FILE_PREFIX + versionId + FILE_NAME_SEPARATOR + startSearchIndex + WAL_FILE_SUFFIX;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
index 3c5d5a36be..eb9b2d0058 100644
--- a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.tools;
 
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.exception.SystemCheckException;
@@ -28,6 +27,7 @@ import org.apache.iotdb.db.wal.io.ILogWriter;
 import org.apache.iotdb.db.wal.io.WALFileTest;
 import org.apache.iotdb.db.wal.io.WALWriter;
 import org.apache.iotdb.db.wal.utils.WALByteBufferForTest;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.Test;
@@ -80,8 +80,7 @@ public class WalCheckerTest {
         File walNodeDir = new File(tempRoot, String.valueOf(i));
         walNodeDir.mkdir();
 
-        File walFile =
-            new File(walNodeDir, WALWriter.FILE_PREFIX + i + IoTDBConstant.WAL_FILE_SUFFIX);
+        File walFile = new File(walNodeDir, WALFileUtils.getLogFileName(i, 0));
         int fakeMemTableId = 1;
         List<WALEntry> walEntries = new ArrayList<>();
         walEntries.add(new WALEntry(fakeMemTableId, WALFileTest.getInsertRowPlan(DEVICE_ID)));
@@ -117,7 +116,7 @@ public class WalCheckerTest {
         File walNodeDir = new File(tempRoot, String.valueOf(i));
         walNodeDir.mkdir();
 
-        File walFile = new File(walNodeDir, "_" + i + IoTDBConstant.WAL_FILE_SUFFIX);
+        File walFile = new File(walNodeDir, WALFileUtils.getLogFileName(i, 0));
         int fakeMemTableId = 1;
         List<WALEntry> walEntries = new ArrayList<>();
         walEntries.add(new WALEntry(fakeMemTableId, WALFileTest.getInsertRowPlan(DEVICE_ID)));
@@ -158,7 +157,7 @@ public class WalCheckerTest {
         File walNodeDir = new File(tempRoot, String.valueOf(i));
         walNodeDir.mkdir();
 
-        File walFile = new File(walNodeDir, "_" + i + IoTDBConstant.WAL_FILE_SUFFIX);
+        File walFile = new File(walNodeDir, WALFileUtils.getLogFileName(i, 0));
 
         FileOutputStream fileOutputStream = new FileOutputStream(walFile);
         try {
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/DisableWALTest.java b/server/src/test/java/org/apache/iotdb/db/wal/DisableWALTest.java
index 8b9475c77e..d16365cea0 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/DisableWALTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/DisableWALTest.java
@@ -48,7 +48,7 @@ public class DisableWALTest {
   @Test
   public void testDisableWAL() {
     WALManager walManager = WALManager.getInstance();
-    IWALNode walNode = walManager.applyForWALNode();
+    IWALNode walNode = walManager.applyForWALNode("");
     assertEquals(WALFakeNode.getSuccessInstance(), walNode);
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java
index 20ae8c8db0..83343f7859 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java
@@ -25,8 +25,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.wal.io.WALWriter;
-import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import org.junit.After;
@@ -66,22 +66,42 @@ public class WALManagerTest {
   }
 
   @Test
-  public void testAllocateWALNode() throws IllegalPathException {
+  public void testDeleteOutdatedWALFiles() throws IllegalPathException {
     WALManager walManager = WALManager.getInstance();
-    IWALNode[] walNodes = new IWALNode[6];
+    WALNode[] walNodes = new WALNode[6];
     for (int i = 0; i < 12; i++) {
-      IWALNode walNode = walManager.applyForWALNode();
+      WALNode walNode = (WALNode) walManager.applyForWALNode(String.valueOf(i));
       if (i < 6) {
         walNodes[i] = walNode;
       } else {
         assertEquals(walNodes[i % 6], walNode);
       }
       walNode.log(i, getInsertRowPlan());
+      walNode.rollWALFile();
     }
+
     for (String walDir : walDirs) {
       File walDirFile = new File(walDir);
       assertTrue(walDirFile.exists());
-      assertNotNull(walDirFile.list(WALWriter::walFilenameFilter));
+      File[] nodeDirs = walDirFile.listFiles(File::isDirectory);
+      assertNotNull(nodeDirs);
+      for (File nodeDir : nodeDirs) {
+        assertTrue(nodeDir.exists());
+        assertEquals(3, WALFileUtils.listAllWALFiles(nodeDir).length);
+      }
+    }
+
+    walManager.deleteOutdatedWALFiles();
+
+    for (String walDir : walDirs) {
+      File walDirFile = new File(walDir);
+      assertTrue(walDirFile.exists());
+      File[] nodeDirs = walDirFile.listFiles(File::isDirectory);
+      assertNotNull(nodeDirs);
+      for (File nodeDir : nodeDirs) {
+        assertTrue(nodeDir.exists());
+        assertEquals(1, WALFileUtils.listAllWALFiles(nodeDir).length);
+      }
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategyTest.java
similarity index 57%
copy from server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java
copy to server/src/test/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategyTest.java
index 20ae8c8db0..8b36ee3f27 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategyTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.wal;
+package org.apache.iotdb.db.wal.allocation;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -25,8 +25,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.wal.io.WALWriter;
 import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import org.junit.After;
@@ -36,10 +36,11 @@ import org.junit.Test;
 import java.io.File;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-public class WALManagerTest {
+public class FirstCreateStrategyTest {
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private final String[] walDirs =
       new String[] {
@@ -67,21 +68,71 @@ public class WALManagerTest {
 
   @Test
   public void testAllocateWALNode() throws IllegalPathException {
-    WALManager walManager = WALManager.getInstance();
+    FirstCreateStrategy roundRobinStrategy = new FirstCreateStrategy();
     IWALNode[] walNodes = new IWALNode[6];
-    for (int i = 0; i < 12; i++) {
-      IWALNode walNode = walManager.applyForWALNode();
-      if (i < 6) {
-        walNodes[i] = walNode;
-      } else {
-        assertEquals(walNodes[i % 6], walNode);
+    try {
+      for (int i = 0; i < 12; i++) {
+        String identifier = String.valueOf(i % 6);
+        IWALNode walNode = roundRobinStrategy.applyForWALNode(identifier);
+        if (i < 6) {
+          walNodes[i] = walNode;
+        } else {
+          assertEquals(walNodes[i % 6], walNode);
+        }
+        walNode.log(i, getInsertRowPlan());
+      }
+      for (String walDir : walDirs) {
+        File walDirFile = new File(walDir);
+        assertTrue(walDirFile.exists());
+        File[] nodeDirs = walDirFile.listFiles(File::isDirectory);
+        assertNotNull(nodeDirs);
+        assertEquals(2, nodeDirs.length);
+        for (File nodeDir : nodeDirs) {
+          assertTrue(nodeDir.exists());
+          assertNotEquals(0, WALFileUtils.listAllWALFiles(nodeDir).length);
+        }
+      }
+    } finally {
+      for (IWALNode walNode : walNodes) {
+        if (walNode != null) {
+          walNode.close();
+        }
       }
-      walNode.log(i, getInsertRowPlan());
     }
-    for (String walDir : walDirs) {
-      File walDirFile = new File(walDir);
+  }
+
+  @Test
+  public void testRegisterWALNode() throws IllegalPathException {
+    FirstCreateStrategy roundRobinStrategy = new FirstCreateStrategy();
+    IWALNode[] walNodes = new IWALNode[6];
+    try {
+      for (int i = 0; i < 12; i++) {
+        String identifier = String.valueOf(i % 6);
+        roundRobinStrategy.registerWALNode(identifier, walDirs[0] + File.separator + identifier);
+        IWALNode walNode = roundRobinStrategy.applyForWALNode(identifier);
+        if (i < 6) {
+          walNodes[i] = walNode;
+        } else {
+          assertEquals(walNodes[i % 6], walNode);
+        }
+        walNode.log(i, getInsertRowPlan());
+      }
+
+      File walDirFile = new File(walDirs[0]);
       assertTrue(walDirFile.exists());
-      assertNotNull(walDirFile.list(WALWriter::walFilenameFilter));
+      File[] nodeDirs = walDirFile.listFiles(File::isDirectory);
+      assertNotNull(nodeDirs);
+      assertEquals(6, nodeDirs.length);
+      for (File nodeDir : nodeDirs) {
+        assertTrue(nodeDir.exists());
+        assertNotEquals(0, WALFileUtils.listAllWALFiles(nodeDir).length);
+      }
+    } finally {
+      for (IWALNode walNode : walNodes) {
+        if (walNode != null) {
+          walNode.close();
+        }
+      }
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/allocation/RoundRobinStrategyTest.java
similarity index 73%
copy from server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java
copy to server/src/test/java/org/apache/iotdb/db/wal/allocation/RoundRobinStrategyTest.java
index 20ae8c8db0..cdf0c2b4ac 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/allocation/RoundRobinStrategyTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.wal;
+package org.apache.iotdb.db.wal.allocation;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -25,8 +25,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.wal.io.WALWriter;
 import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import org.junit.After;
@@ -36,10 +36,11 @@ import org.junit.Test;
 import java.io.File;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-public class WALManagerTest {
+public class RoundRobinStrategyTest {
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private final String[] walDirs =
       new String[] {
@@ -67,21 +68,35 @@ public class WALManagerTest {
 
   @Test
   public void testAllocateWALNode() throws IllegalPathException {
-    WALManager walManager = WALManager.getInstance();
+    RoundRobinStrategy roundRobinStrategy = new RoundRobinStrategy(6);
     IWALNode[] walNodes = new IWALNode[6];
-    for (int i = 0; i < 12; i++) {
-      IWALNode walNode = walManager.applyForWALNode();
-      if (i < 6) {
-        walNodes[i] = walNode;
-      } else {
-        assertEquals(walNodes[i % 6], walNode);
+    try {
+      for (int i = 0; i < 12; i++) {
+        IWALNode walNode = roundRobinStrategy.applyForWALNode(String.valueOf(i));
+        if (i < 6) {
+          walNodes[i] = walNode;
+        } else {
+          assertEquals(walNodes[i % 6], walNode);
+        }
+        walNode.log(i, getInsertRowPlan());
+      }
+      for (String walDir : walDirs) {
+        File walDirFile = new File(walDir);
+        assertTrue(walDirFile.exists());
+        File[] nodeDirs = walDirFile.listFiles(File::isDirectory);
+        assertNotNull(nodeDirs);
+        assertEquals(2, nodeDirs.length);
+        for (File nodeDir : nodeDirs) {
+          assertTrue(nodeDir.exists());
+          assertNotEquals(0, WALFileUtils.listAllWALFiles(nodeDir).length);
+        }
+      }
+    } finally {
+      for (IWALNode walNode : walNodes) {
+        if (walNode != null) {
+          walNode.close();
+        }
       }
-      walNode.log(i, getInsertRowPlan());
-    }
-    for (String walDir : walDirs) {
-      File walDirFile = new File(walDir);
-      assertTrue(walDirFile.exists());
-      assertNotNull(walDirFile.list(WALWriter::walFilenameFilter));
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/buffer/WALBufferCommonTest.java b/server/src/test/java/org/apache/iotdb/db/wal/buffer/WALBufferCommonTest.java
index aa30956d4e..a803a410b6 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/buffer/WALBufferCommonTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/buffer/WALBufferCommonTest.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.wal.io.WALReader;
-import org.apache.iotdb.db.wal.io.WALWriter;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import org.junit.After;
@@ -94,7 +94,7 @@ public abstract class WALBufferCommonTest {
     }
     Thread.sleep(1_000);
     // check .wal files
-    File[] walFiles = new File(logDirectory).listFiles(WALWriter::walFilenameFilter);
+    File[] walFiles = WALFileUtils.listAllWALFiles(new File(logDirectory));
     Set<InsertRowPlan> actualInsertRowPlans = new HashSet<>();
     if (walFiles != null) {
       for (File walFile : walFiles) {
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManagerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManagerTest.java
index 91e947c022..5e77070e68 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManagerTest.java
@@ -18,12 +18,14 @@
  */
 package org.apache.iotdb.db.wal.checkpoint;
 
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.wal.io.CheckpointReader;
-import org.apache.iotdb.db.wal.io.CheckpointWriter;
 import org.apache.iotdb.db.wal.recover.CheckpointRecoverUtils;
+import org.apache.iotdb.db.wal.utils.CheckpointFileUtils;
 
 import org.junit.After;
 import org.junit.Before;
@@ -46,19 +48,24 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class CheckpointManagerTest {
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final String identifier = String.valueOf(Integer.MAX_VALUE);
   private static final String logDirectory = TestConstant.BASE_OUTPUT_PATH.concat("wal-test");
   private CheckpointManager checkpointManager;
+  private long prevFileSize;
 
   @Before
   public void setUp() throws Exception {
     EnvironmentUtils.cleanDir(logDirectory);
+    prevFileSize = config.getCheckpointFileSizeThresholdInByte();
+    config.setCheckpointFileSizeThresholdInByte(10 * 1024);
     checkpointManager = new CheckpointManager(identifier, logDirectory);
   }
 
   @After
   public void tearDown() throws Exception {
     checkpointManager.close();
+    config.setCheckpointFileSizeThresholdInByte(prevFileSize);
     EnvironmentUtils.cleanDir(logDirectory);
   }
 
@@ -69,7 +76,7 @@ public class CheckpointManagerTest {
     List<Checkpoint> expectedCheckpoints = Collections.singletonList(initCheckpoint);
     CheckpointReader checkpointReader =
         new CheckpointReader(
-            new File(logDirectory + File.separator + CheckpointWriter.getLogFileName(0)));
+            new File(logDirectory + File.separator + CheckpointFileUtils.getLogFileName(0)));
     List<Checkpoint> actualCheckpoints = checkpointReader.readAll();
     assertEquals(expectedCheckpoints, actualCheckpoints);
   }
@@ -122,7 +129,7 @@ public class CheckpointManagerTest {
     int versionId = 0;
     Map<Integer, MemTableInfo> expectedMemTableId2Info = new HashMap<>();
     Map<Integer, Integer> versionId2memTableId = new HashMap<>();
-    while (size < CheckpointManager.LOG_SIZE_LIMIT) {
+    while (size < config.getCheckpointFileSizeThresholdInByte()) {
       ++versionId;
       String tsFilePath = logDirectory + File.separator + versionId + ".tsfile";
       MemTableInfo memTableInfo = new MemTableInfo(new PrimitiveMemTable(), tsFilePath, versionId);
@@ -146,9 +153,9 @@ public class CheckpointManagerTest {
     assertEquals(5, checkpointManager.getFirstValidWALVersionId());
     // check checkpoint files
     assertFalse(
-        new File(logDirectory + File.separator + CheckpointWriter.getLogFileName(0)).exists());
+        new File(logDirectory + File.separator + CheckpointFileUtils.getLogFileName(0)).exists());
     assertTrue(
-        new File(logDirectory + File.separator + CheckpointWriter.getLogFileName(1)).exists());
+        new File(logDirectory + File.separator + CheckpointFileUtils.getLogFileName(1)).exists());
     // recover info from checkpoint file
     Map<Integer, MemTableInfo> actualMemTableId2Info =
         CheckpointRecoverUtils.recoverMemTableInfo(new File(logDirectory));
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
new file mode 100644
index 0000000000..7eb5f06b90
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.node;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.wal.ConsensusReqReader;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.wal.utils.WALMode;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+public class ConsensusReqReaderTest {
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final String identifier = String.valueOf(Integer.MAX_VALUE);
+  private static final String logDirectory = TestConstant.BASE_OUTPUT_PATH.concat("wal-test");
+  private static final String devicePath = "root.test_sg.test_d";
+  private WALMode prevMode;
+  private WALNode walNode;
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.cleanDir(logDirectory);
+    prevMode = config.getWalMode();
+    config.setWalMode(WALMode.SYNC);
+    walNode = new WALNode(identifier, logDirectory);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    walNode.close();
+    config.setWalMode(prevMode);
+    EnvironmentUtils.cleanDir(logDirectory);
+  }
+
+  /**
+   * Generate wal files as below: <br>
+   * &nbsp _0-0.wal: 1,-1 <br>
+   * &nbsp _1-1.wal: 2,2,2 <br>
+   * &nbsp _2-2.wal: 3,3 <br>
+   * &nbsp _3-3.wal: 3,4 <br>
+   * &nbsp _4-4.wal: 4 <br>
+   * &nbsp _5-4.wal: 4,4,5 <br>
+   * &nbsp _6-5.wal: 6 <br>
+   * 1 - InsertRowNode, 2 - InsertRowsOfOneDeviceNode, 3 - InsertRowsNode, 4 -
+   * InsertMultiTabletsNode, 5 - InsertTabletNode, 6 - InsertRowNode
+   */
+  private void simulateFileScenario01() throws IllegalPathException {
+    InsertTabletNode insertTabletNode;
+    InsertRowNode insertRowNode;
+    // _0-0.wal
+    insertRowNode = getInsertRowNode(devicePath);
+    insertRowNode.setSearchIndex(1);
+    walNode.log(0, insertRowNode); // 1
+    insertTabletNode = getInsertTabletNode(devicePath, new long[] {2});
+    walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // -1
+    walNode.rollWALFile();
+    // _1-1.wal
+    insertRowNode = getInsertRowNode(devicePath);
+    insertRowNode.setSearchIndex(2);
+    walNode.log(0, insertRowNode); // 2
+    walNode.log(0, insertRowNode); // 2
+    walNode.log(0, insertRowNode); // 2
+    walNode.rollWALFile();
+    // _2-2.wal
+    insertRowNode = getInsertRowNode(devicePath);
+    insertRowNode.setSearchIndex(3);
+    walNode.log(0, insertRowNode); // 3
+    walNode.log(0, insertRowNode); // 3
+    walNode.rollWALFile();
+    // _3-3.wal
+    insertRowNode.setDevicePath(new PartialPath(devicePath + "test"));
+    walNode.log(0, insertRowNode); // 3
+    insertTabletNode = getInsertTabletNode(devicePath, new long[] {4});
+    insertTabletNode.setSearchIndex(4);
+    walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
+    walNode.rollWALFile();
+    // _4-4.wal
+    walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
+    walNode.rollWALFile();
+    // _5-4.wal
+    walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
+    walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
+    insertTabletNode = getInsertTabletNode(devicePath, new long[] {5});
+    insertTabletNode.setSearchIndex(5);
+    walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 5
+    walNode.rollWALFile();
+    // _6-5.wal
+    insertRowNode = getInsertRowNode(devicePath);
+    insertRowNode.setSearchIndex(6);
+    WALFlushListener walFlushListener = walNode.log(0, insertRowNode); // 6
+    walFlushListener.waitForResult();
+  }
+
+  @Test
+  public void scenario01TestGetReq01() throws Exception {
+    simulateFileScenario01();
+
+    IConsensusRequest request;
+    request = walNode.getReq(1);
+    Assert.assertTrue(request instanceof InsertRowNode);
+    Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
+    request = walNode.getReq(2);
+    Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
+    Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
+    Assert.assertEquals(
+        3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
+    request = walNode.getReq(3);
+    Assert.assertTrue(request instanceof InsertRowsNode);
+    Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
+    Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
+    request = walNode.getReq(4);
+    Assert.assertTrue(request instanceof InsertMultiTabletsNode);
+    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
+    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+    request = walNode.getReq(5);
+    Assert.assertTrue(request instanceof InsertTabletNode);
+    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+    request = walNode.getReq(6);
+    Assert.assertNull(request);
+  }
+
+  @Test
+  public void scenario01TestGetReqs01() throws Exception {
+    simulateFileScenario01();
+    List<IConsensusRequest> requests;
+    IConsensusRequest request;
+
+    requests = walNode.getReqs(1, 6);
+    Assert.assertEquals(5, requests.size());
+    request = requests.get(0);
+    Assert.assertTrue(request instanceof InsertRowNode);
+    Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
+    request = requests.get(1);
+    Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
+    Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
+    Assert.assertEquals(
+        3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
+    request = requests.get(2);
+    Assert.assertTrue(request instanceof InsertRowsNode);
+    Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
+    Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
+    request = requests.get(3);
+    Assert.assertTrue(request instanceof InsertMultiTabletsNode);
+    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
+    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+    request = requests.get(4);
+    Assert.assertTrue(request instanceof InsertTabletNode);
+    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+  }
+
+  @Test
+  public void scenario01TestGetReqs02() throws Exception {
+    simulateFileScenario01();
+    List<IConsensusRequest> requests;
+    IConsensusRequest request;
+
+    requests = walNode.getReqs(3, 1);
+    Assert.assertEquals(1, requests.size());
+    request = requests.get(0);
+    Assert.assertTrue(request instanceof InsertRowsNode);
+    Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
+    Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
+  }
+
+  @Test
+  public void scenario01TestGetReqs03() throws Exception {
+    simulateFileScenario01();
+    List<IConsensusRequest> requests;
+    IConsensusRequest request;
+
+    requests = walNode.getReqs(4, 2);
+    Assert.assertEquals(2, requests.size());
+    request = requests.get(0);
+    Assert.assertTrue(request instanceof InsertMultiTabletsNode);
+    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
+    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+    request = requests.get(1);
+    Assert.assertTrue(request instanceof InsertTabletNode);
+    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+  }
+
+  @Test
+  public void scenario01TestGetReqs04() throws Exception {
+    simulateFileScenario01();
+    List<IConsensusRequest> requests;
+    IConsensusRequest request;
+
+    requests = walNode.getReqs(5, 100);
+    Assert.assertEquals(1, requests.size());
+    request = requests.get(0);
+    Assert.assertTrue(request instanceof InsertTabletNode);
+    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+  }
+
+  @Test
+  public void scenario01TestGetReqs05() throws Exception {
+    simulateFileScenario01();
+    List<IConsensusRequest> requests;
+
+    requests = walNode.getReqs(6, 100);
+    Assert.assertEquals(0, requests.size());
+  }
+
+  @Test
+  public void scenario01TestGetReqIterator01() throws Exception {
+    simulateFileScenario01();
+    IConsensusRequest request;
+    ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
+
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertRowNode);
+    Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
+    Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
+    Assert.assertEquals(
+        3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertRowsNode);
+    Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
+    Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertMultiTabletsNode);
+    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
+    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertTabletNode);
+    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void scenario01TestGetReqIterator02() throws Exception {
+    simulateFileScenario01();
+    IConsensusRequest request;
+    ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(4);
+
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertMultiTabletsNode);
+    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
+    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertTabletNode);
+    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+
+    Assert.assertFalse(iterator.hasNext());
+    // wait for next
+    ExecutorService checkThread = Executors.newSingleThreadExecutor();
+    Future<Boolean> future =
+        checkThread.submit(
+            () -> {
+              iterator.waitForNextReady();
+              Assert.assertTrue(iterator.hasNext());
+              IConsensusRequest req = iterator.next();
+              Assert.assertTrue(req instanceof InsertRowNode);
+              Assert.assertEquals(6, ((InsertRowNode) req).getSearchIndex());
+              return true;
+            });
+
+    Thread.sleep(500);
+    InsertRowNode insertRowNode = getInsertRowNode(devicePath);
+    walNode.log(0, insertRowNode); // put -1 after 6
+    Assert.assertTrue(future.get());
+  }
+
+  @Test
+  public void scenario01TestGetReqIterator03() throws Exception {
+    simulateFileScenario01();
+    IConsensusRequest request;
+    ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
+
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertTabletNode);
+    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+
+    Assert.assertFalse(iterator.hasNext());
+    // wait for next
+    ExecutorService checkThread = Executors.newSingleThreadExecutor();
+    Future<Boolean> future =
+        checkThread.submit(
+            () -> {
+              iterator.waitForNextReady();
+              Assert.assertTrue(iterator.hasNext());
+              IConsensusRequest req = iterator.next();
+              Assert.assertTrue(req instanceof InsertRowNode);
+              Assert.assertEquals(6, ((InsertRowNode) req).getSearchIndex());
+              return true;
+            });
+
+    Thread.sleep(500);
+    walNode.rollWALFile();
+    InsertRowNode insertRowNode = getInsertRowNode(devicePath);
+    walNode.log(0, insertRowNode); // put -1 after 6
+    Assert.assertTrue(future.get());
+  }
+
+  @Test
+  public void scenario01TestGetReqIterator04() throws Exception {
+    simulateFileScenario01();
+    IConsensusRequest request;
+    ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
+
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertRowNode);
+    Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
+    Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
+    Assert.assertEquals(
+        3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
+
+    iterator.skipTo(4);
+
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertMultiTabletsNode);
+    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
+    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertTabletNode);
+    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void scenario01TestGetReqIterator05() throws Exception {
+    simulateFileScenario01();
+    IConsensusRequest request;
+    ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
+
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertTabletNode);
+    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+
+    iterator.skipTo(2);
+
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
+    Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
+    Assert.assertEquals(
+        3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertRowsNode);
+    Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
+    Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertMultiTabletsNode);
+    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
+    Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertTrue(request instanceof InsertTabletNode);
+    Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  public static InsertRowNode getInsertRowNode(String devicePath) throws IllegalPathException {
+    long time = 110L;
+    TSDataType[] dataTypes =
+        new TSDataType[] {
+          TSDataType.DOUBLE,
+          TSDataType.FLOAT,
+          TSDataType.INT64,
+          TSDataType.INT32,
+          TSDataType.BOOLEAN,
+          TSDataType.TEXT
+        };
+
+    Object[] columns = new Object[6];
+    columns[0] = 1.0;
+    columns[1] = 2.0f;
+    columns[2] = 10000L;
+    columns[3] = 100;
+    columns[4] = false;
+    columns[5] = new Binary("hh" + 0);
+
+    InsertRowNode insertRowNode =
+        new InsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath(devicePath),
+            false,
+            new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+            dataTypes,
+            time,
+            columns,
+            false);
+
+    insertRowNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.DOUBLE),
+          new MeasurementSchema("s2", TSDataType.FLOAT),
+          new MeasurementSchema("s3", TSDataType.INT64),
+          new MeasurementSchema("s4", TSDataType.INT32),
+          new MeasurementSchema("s5", TSDataType.BOOLEAN),
+          new MeasurementSchema("s6", TSDataType.TEXT)
+        });
+    return insertRowNode;
+  }
+
+  private InsertTabletNode getInsertTabletNode(String devicePath, long[] times)
+      throws IllegalPathException {
+    TSDataType[] dataTypes =
+        new TSDataType[] {
+          TSDataType.DOUBLE,
+          TSDataType.FLOAT,
+          TSDataType.INT64,
+          TSDataType.INT32,
+          TSDataType.BOOLEAN,
+          TSDataType.TEXT
+        };
+
+    Object[] columns = new Object[6];
+    columns[0] = new double[times.length];
+    columns[1] = new float[times.length];
+    columns[2] = new long[times.length];
+    columns[3] = new int[times.length];
+    columns[4] = new boolean[times.length];
+    columns[5] = new Binary[times.length];
+
+    for (int r = 0; r < times.length; r++) {
+      ((double[]) columns[0])[r] = 1.0 + r;
+      ((float[]) columns[1])[r] = 2 + r;
+      ((long[]) columns[2])[r] = 10000 + r;
+      ((int[]) columns[3])[r] = 100 + r;
+      ((boolean[]) columns[4])[r] = (r % 2 == 0);
+      ((Binary[]) columns[5])[r] = new Binary("hh" + r);
+    }
+
+    BitMap[] bitMaps = new BitMap[dataTypes.length];
+    for (int i = 0; i < dataTypes.length; i++) {
+      if (bitMaps[i] == null) {
+        bitMaps[i] = new BitMap(times.length);
+      }
+      bitMaps[i].mark(i % times.length);
+    }
+
+    InsertTabletNode insertTabletNode =
+        new InsertTabletNode(
+            new PlanNodeId(""),
+            new PartialPath(devicePath),
+            false,
+            new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+            dataTypes,
+            times,
+            bitMaps,
+            columns,
+            times.length);
+
+    insertTabletNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.DOUBLE),
+          new MeasurementSchema("s2", TSDataType.FLOAT),
+          new MeasurementSchema("s3", TSDataType.INT64),
+          new MeasurementSchema("s4", TSDataType.INT32),
+          new MeasurementSchema("s5", TSDataType.BOOLEAN),
+          new MeasurementSchema("s6", TSDataType.TEXT)
+        });
+
+    return insertTabletNode;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
index 28e6bd6004..6c72f0aeeb 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
@@ -29,8 +29,8 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
 import org.apache.iotdb.db.wal.io.WALReader;
-import org.apache.iotdb.db.wal.io.WALWriter;
 import org.apache.iotdb.db.wal.recover.CheckpointRecoverUtils;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
 import org.apache.iotdb.db.wal.utils.WALMode;
 import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -115,7 +115,7 @@ public class WALNodeTest {
     }
     Thread.sleep(1_000);
     // check .wal files
-    File[] walFiles = new File(logDirectory).listFiles(WALWriter::walFilenameFilter);
+    File[] walFiles = WALFileUtils.listAllWALFiles(new File(logDirectory));
     Set<InsertTabletPlan> actualInsertTabletPlans = new HashSet<>();
     if (walFiles != null) {
       for (File walFile : walFiles) {
@@ -258,12 +258,16 @@ public class WALNodeTest {
     }
     walNode.onMemTableFlushed(memTable);
     walNode.onMemTableCreated(new PrimitiveMemTable(), tsFilePath);
-    // check existence of _0.wal file
-    assertTrue(new File(logDirectory + File.separator + WALWriter.getLogFileName(0)).exists());
-    assertTrue(new File(logDirectory + File.separator + WALWriter.getLogFileName(1)).exists());
+    // check existence of _0-0.wal file
+    assertTrue(
+        new File(logDirectory + File.separator + WALFileUtils.getLogFileName(0, 0)).exists());
+    assertTrue(
+        new File(logDirectory + File.separator + WALFileUtils.getLogFileName(1, 0)).exists());
     walNode.deleteOutdatedFiles();
-    assertFalse(new File(logDirectory + File.separator + WALWriter.getLogFileName(0)).exists());
-    assertTrue(new File(logDirectory + File.separator + WALWriter.getLogFileName(1)).exists());
+    assertFalse(
+        new File(logDirectory + File.separator + WALFileUtils.getLogFileName(0, 0)).exists());
+    assertTrue(
+        new File(logDirectory + File.separator + WALFileUtils.getLogFileName(1, 0)).exists());
     // check flush listeners
     try {
       for (WALFlushListener walFlushListener : walFlushListeners) {
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/utils/WALFileUtilsTest.java b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALFileUtilsTest.java
new file mode 100644
index 0000000000..007b40346b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALFileUtilsTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+
+public class WALFileUtilsTest {
+  @Test
+  public void binarySearchFileBySearchIndex01() {
+    File[] files =
+        new File[] {
+          new File(WALFileUtils.getLogFileName(0, 0)),
+          new File(WALFileUtils.getLogFileName(1, 5)),
+          new File(WALFileUtils.getLogFileName(2, 5)),
+          new File(WALFileUtils.getLogFileName(3, 12)),
+          new File(WALFileUtils.getLogFileName(4, 12))
+        };
+    int i = WALFileUtils.binarySearchFileBySearchIndex(files, 0);
+    Assert.assertEquals(-1, i);
+  }
+
+  @Test
+  public void binarySearchFileBySearchIndex02() {
+    File[] files =
+        new File[] {
+          new File(WALFileUtils.getLogFileName(0, 0)),
+          new File(WALFileUtils.getLogFileName(1, 5)),
+          new File(WALFileUtils.getLogFileName(2, 5)),
+          new File(WALFileUtils.getLogFileName(3, 12)),
+          new File(WALFileUtils.getLogFileName(4, 12))
+        };
+    int i = WALFileUtils.binarySearchFileBySearchIndex(files, 2);
+    Assert.assertEquals(0, i);
+  }
+
+  @Test
+  public void binarySearchFileBySearchIndex03() {
+    File[] files =
+        new File[] {
+          new File(WALFileUtils.getLogFileName(0, 0)),
+          new File(WALFileUtils.getLogFileName(1, 5)),
+          new File(WALFileUtils.getLogFileName(2, 5)),
+          new File(WALFileUtils.getLogFileName(3, 12)),
+          new File(WALFileUtils.getLogFileName(4, 12))
+        };
+    int i = WALFileUtils.binarySearchFileBySearchIndex(files, 5);
+    Assert.assertEquals(0, i);
+  }
+
+  @Test
+  public void binarySearchFileBySearchIndex04() {
+    File[] files =
+        new File[] {
+          new File(WALFileUtils.getLogFileName(0, 0)),
+          new File(WALFileUtils.getLogFileName(1, 5)),
+          new File(WALFileUtils.getLogFileName(2, 5)),
+          new File(WALFileUtils.getLogFileName(3, 12)),
+          new File(WALFileUtils.getLogFileName(4, 12))
+        };
+    int i = WALFileUtils.binarySearchFileBySearchIndex(files, 6);
+    Assert.assertEquals(2, i);
+  }
+
+  @Test
+  public void binarySearchFileBySearchIndex05() {
+    File[] files =
+        new File[] {
+          new File(WALFileUtils.getLogFileName(0, 0)),
+          new File(WALFileUtils.getLogFileName(1, 5)),
+          new File(WALFileUtils.getLogFileName(2, 5)),
+          new File(WALFileUtils.getLogFileName(3, 12)),
+          new File(WALFileUtils.getLogFileName(4, 12))
+        };
+    int i = WALFileUtils.binarySearchFileBySearchIndex(files, 10);
+    Assert.assertEquals(2, i);
+  }
+
+  @Test
+  public void binarySearchFileBySearchIndex06() {
+    File[] files =
+        new File[] {
+          new File(WALFileUtils.getLogFileName(0, 0)),
+          new File(WALFileUtils.getLogFileName(1, 5)),
+          new File(WALFileUtils.getLogFileName(2, 5)),
+          new File(WALFileUtils.getLogFileName(3, 12)),
+          new File(WALFileUtils.getLogFileName(4, 12))
+        };
+    int i = WALFileUtils.binarySearchFileBySearchIndex(files, 12);
+    Assert.assertEquals(2, i);
+  }
+
+  @Test
+  public void binarySearchFileBySearchIndex07() {
+    File[] files =
+        new File[] {
+          new File(WALFileUtils.getLogFileName(0, 0)),
+          new File(WALFileUtils.getLogFileName(1, 5)),
+          new File(WALFileUtils.getLogFileName(2, 5)),
+          new File(WALFileUtils.getLogFileName(3, 12)),
+          new File(WALFileUtils.getLogFileName(4, 12))
+        };
+    int i = WALFileUtils.binarySearchFileBySearchIndex(files, Long.MAX_VALUE);
+    Assert.assertEquals(4, i);
+  }
+
+  @Test
+  public void binarySearchFileBySearchIndex08() {
+    File[] files =
+        new File[] {
+          new File(WALFileUtils.getLogFileName(0, 0)),
+          new File(WALFileUtils.getLogFileName(1, 100)),
+          new File(WALFileUtils.getLogFileName(2, 200)),
+          new File(WALFileUtils.getLogFileName(3, 300)),
+          new File(WALFileUtils.getLogFileName(4, 400))
+        };
+    int i = WALFileUtils.binarySearchFileBySearchIndex(files, 10);
+    Assert.assertEquals(0, i);
+  }
+
+  @Test
+  public void binarySearchFileBySearchIndex09() {
+    File[] files =
+        new File[] {
+          new File(WALFileUtils.getLogFileName(0, 0)),
+          new File(WALFileUtils.getLogFileName(1, 100)),
+          new File(WALFileUtils.getLogFileName(2, 200)),
+          new File(WALFileUtils.getLogFileName(3, 300)),
+          new File(WALFileUtils.getLogFileName(4, 400))
+        };
+    int i = WALFileUtils.binarySearchFileBySearchIndex(files, 250);
+    Assert.assertEquals(2, i);
+  }
+
+  @Test
+  public void binarySearchFileBySearchIndex10() {
+    File[] files =
+        new File[] {
+          new File(WALFileUtils.getLogFileName(0, 0)),
+          new File(WALFileUtils.getLogFileName(1, 0)),
+          new File(WALFileUtils.getLogFileName(2, 0)),
+          new File(WALFileUtils.getLogFileName(3, 0)),
+          new File(WALFileUtils.getLogFileName(4, 5)),
+          new File(WALFileUtils.getLogFileName(5, 5)),
+          new File(WALFileUtils.getLogFileName(6, 5)),
+          new File(WALFileUtils.getLogFileName(7, 5)),
+          new File(WALFileUtils.getLogFileName(8, 12)),
+          new File(WALFileUtils.getLogFileName(9, 12)),
+          new File(WALFileUtils.getLogFileName(10, 12)),
+          new File(WALFileUtils.getLogFileName(11, 12))
+        };
+    int i = WALFileUtils.binarySearchFileBySearchIndex(files, 5);
+    Assert.assertEquals(3, i);
+  }
+
+  @Test
+  public void binarySearchFileBySearchIndex11() {
+    File[] files =
+        new File[] {
+          new File(WALFileUtils.getLogFileName(0, 0)),
+          new File(WALFileUtils.getLogFileName(1, 0)),
+          new File(WALFileUtils.getLogFileName(2, 0)),
+          new File(WALFileUtils.getLogFileName(3, 0)),
+          new File(WALFileUtils.getLogFileName(4, 5)),
+          new File(WALFileUtils.getLogFileName(5, 5)),
+          new File(WALFileUtils.getLogFileName(6, 5)),
+          new File(WALFileUtils.getLogFileName(7, 5)),
+          new File(WALFileUtils.getLogFileName(8, 12)),
+          new File(WALFileUtils.getLogFileName(9, 12)),
+          new File(WALFileUtils.getLogFileName(10, 12)),
+          new File(WALFileUtils.getLogFileName(11, 12))
+        };
+    int i = WALFileUtils.binarySearchFileBySearchIndex(files, 6);
+    Assert.assertEquals(7, i);
+  }
+
+  @Test
+  public void binarySearchFileBySearchIndex12() {
+    File[] files =
+        new File[] {
+          new File(WALFileUtils.getLogFileName(0, 0)),
+          new File(WALFileUtils.getLogFileName(1, 0)),
+          new File(WALFileUtils.getLogFileName(2, 0)),
+          new File(WALFileUtils.getLogFileName(3, 0)),
+          new File(WALFileUtils.getLogFileName(4, 5)),
+          new File(WALFileUtils.getLogFileName(5, 5)),
+          new File(WALFileUtils.getLogFileName(6, 5)),
+          new File(WALFileUtils.getLogFileName(7, 5)),
+          new File(WALFileUtils.getLogFileName(8, 12)),
+          new File(WALFileUtils.getLogFileName(9, 12)),
+          new File(WALFileUtils.getLogFileName(10, 12)),
+          new File(WALFileUtils.getLogFileName(11, 12))
+        };
+    int i = WALFileUtils.binarySearchFileBySearchIndex(files, 12);
+    Assert.assertEquals(7, i);
+  }
+}