You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/05/15 13:03:35 UTC

[iotdb] branch master updated: [IOTDB-5823] wal pipe handler for the pipe module (#9708)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c9fe5c44637 [IOTDB-5823] wal pipe handler for the pipe module (#9708)
c9fe5c44637 is described below

commit c9fe5c44637b5084f984c7a14c5a4168341b989a
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Mon May 15 21:03:26 2023 +0800

    [IOTDB-5823] wal pipe handler for the pipe module (#9708)
---
 .../consensus/iot/wal/ConsensusReqReader.java      |   3 +
 .../consensus/iot/util/FakeConsensusReqReader.java |   5 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  21 +-
 .../iotdb/db/wal/buffer/AbstractWALBuffer.java     |  26 ++-
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  |  32 +--
 .../org/apache/iotdb/db/wal/buffer/WALEntry.java   |   4 +-
 .../iotdb/db/wal/checkpoint/CheckpointManager.java | 118 +++++++---
 .../iotdb/db/wal/checkpoint/MemTableInfo.java      |  28 ++-
 .../MemTablePinException.java}                     |  17 +-
 .../WALPipeException.java}                         |  17 +-
 .../apache/iotdb/db/wal/io/WALByteBufReader.java   |   6 +-
 .../org/apache/iotdb/db/wal/node/WALFakeNode.java  |  24 +-
 .../java/org/apache/iotdb/db/wal/node/WALNode.java |  57 ++++-
 .../iotdb/db/wal/utils/WALEntryPosition.java       | 157 +++++++++++++
 .../apache/iotdb/db/wal/utils/WALFileUtils.java    |  11 +
 .../iotdb/db/wal/utils/WALInsertNodeCache.java     | 170 ++++++++++++++
 .../apache/iotdb/db/wal/utils/WALPipeHandler.java  | 132 +++++++++++
 .../db/wal/utils/listener/WALFlushListener.java    |  13 +-
 .../org/apache/iotdb/db/wal/node/WALNodeTest.java  |   7 +-
 .../iotdb/db/wal/node/WALPipeHandlerTest.java      | 256 +++++++++++++++++++++
 .../iotdb/db/wal/utils/WALInsertNodeCacheTest.java | 167 ++++++++++++++
 21 files changed, 1176 insertions(+), 95 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/wal/ConsensusReqReader.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/wal/ConsensusReqReader.java
index 0fedc0ed085..f7bc2cacad0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/wal/ConsensusReqReader.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/wal/ConsensusReqReader.java
@@ -80,6 +80,9 @@ public interface ConsensusReqReader {
   /** Get current search index */
   long getCurrentSearchIndex();
 
+  /** Get current wal file version */
+  long getCurrentWALFileVersion();
+
   /** Get total size of wal files */
   long getTotalSize();
 }
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
index 3a8c8e4e8c8..da58725f7b5 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
@@ -47,6 +47,11 @@ public class FakeConsensusReqReader implements ConsensusReqReader, DataSet {
     return requestSets.getLocalRequestNumber();
   }
 
+  @Override
+  public long getCurrentWALFileVersion() {
+    return 0;
+  }
+
   @Override
   public long getTotalSize() {
     return 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c7e6c09d633..76fce18d052 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -552,6 +552,10 @@ public class IoTDBConfig {
 
   /** Memory allocated proportion for time partition info */
   private long allocateMemoryForTimePartitionInfo = allocateMemoryForStorageEngine * 50 / 1001;
+
+  /** Memory allocated proportion for wal pipe cache */
+  private long allocateMemoryForWALPipeCache = allocateMemoryForConsensus / 10;
+
   /**
    * If true, we will estimate each query's possible memory footprint before executing it and deny
    * it if its estimated memory exceeds current free memory
@@ -1918,10 +1922,6 @@ public class IoTDBConfig {
     return allocateMemoryForSchema;
   }
 
-  public long getAllocateMemoryForConsensus() {
-    return allocateMemoryForConsensus;
-  }
-
   public void setAllocateMemoryForSchema(long allocateMemoryForSchema) {
     this.allocateMemoryForSchema = allocateMemoryForSchema;
 
@@ -1930,8 +1930,13 @@ public class IoTDBConfig {
     this.allocateMemoryForLastCache = allocateMemoryForSchema / 10;
   }
 
+  public long getAllocateMemoryForConsensus() {
+    return allocateMemoryForConsensus;
+  }
+
   public void setAllocateMemoryForConsensus(long allocateMemoryForConsensus) {
     this.allocateMemoryForConsensus = allocateMemoryForConsensus;
+    this.allocateMemoryForWALPipeCache = allocateMemoryForConsensus / 10;
   }
 
   public long getAllocateMemoryForRead() {
@@ -2198,6 +2203,14 @@ public class IoTDBConfig {
     this.allocateMemoryForTimePartitionInfo = allocateMemoryForTimePartitionInfo;
   }
 
+  public long getAllocateMemoryForWALPipeCache() {
+    return allocateMemoryForWALPipeCache;
+  }
+
+  public void setAllocateMemoryForWALPipeCache(long allocateMemoryForWALPipeCache) {
+    this.allocateMemoryForWALPipeCache = allocateMemoryForWALPipeCache;
+  }
+
   public boolean isEnableQueryMemoryEstimation() {
     return enableQueryMemoryEstimation;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
index 437e34cd5b2..372692ab46d 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
@@ -83,22 +83,29 @@ public abstract class AbstractWALBuffer implements IWALBuffer {
     return currentWALFileWriter.size();
   }
 
-  /** Notice: only called by syncBufferThread and old log writer will be closed by this function. */
-  protected void rollLogWriter(long searchIndex, WALFileStatus fileStatus) throws IOException {
+  /**
+   * Notice: only called by syncBufferThread and old log writer will be closed by this function.
+   *
+   * @return last wal file
+   */
+  protected File rollLogWriter(long searchIndex, WALFileStatus fileStatus) throws IOException {
     // close file
-    File currentFile = currentWALFileWriter.getLogFile();
-    String currentName = currentFile.getName();
+    File lastFile = currentWALFileWriter.getLogFile();
+    String lastName = lastFile.getName();
     currentWALFileWriter.close();
     addDiskUsage(currentWALFileWriter.size());
     addFileNum(1);
-    if (WALFileUtils.parseStatusCode(currentName) != fileStatus) {
+    if (WALFileUtils.parseStatusCode(lastName) != fileStatus) {
       String targetName =
           WALFileUtils.getLogFileName(
-              WALFileUtils.parseVersionId(currentName),
-              WALFileUtils.parseStartSearchIndex(currentName),
+              WALFileUtils.parseVersionId(lastName),
+              WALFileUtils.parseStartSearchIndex(lastName),
               fileStatus);
-      if (!currentFile.renameTo(SystemFileFactory.INSTANCE.getFile(logDirectory, targetName))) {
-        logger.error("Fail to rename file {} to {}", currentName, targetName);
+      File targetFile = SystemFileFactory.INSTANCE.getFile(logDirectory, targetName);
+      if (lastFile.renameTo(targetFile)) {
+        lastFile = targetFile;
+      } else {
+        logger.error("Fail to rename file {} to {}", lastName, targetName);
       }
     }
     // roll file
@@ -111,6 +118,7 @@ public abstract class AbstractWALBuffer implements IWALBuffer {
     currentWALFileWriter = new WALWriter(nextLogFile);
     currentWALFileVersion = nextFileVersion;
     logger.debug("Open new wal file {} for wal node-{}'s buffer.", nextLogFile, identifier);
+    return lastFile;
   }
 
   public long getDiskUsage() {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index b1cbfcf963c..4ef2315fc61 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -40,7 +40,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -140,7 +140,7 @@ public class WALBuffer extends AbstractWALBuffer {
   /** This info class traverses some extra info from serializeThread to syncBufferThread */
   private static class SerializeInfo {
     final WALMetaData metaData = new WALMetaData();
-    final List<WALFlushListener> fsyncListeners = new LinkedList<>();
+    final List<WALFlushListener> fsyncListeners = new ArrayList<>();
     WALFlushListener rollWALFileWriterListener = null;
   }
 
@@ -222,19 +222,12 @@ public class WALBuffer extends AbstractWALBuffer {
         return handleSignalEntry((WALSignalEntry) walEntry);
       }
 
-      boolean success = handleInfoEntry(walEntry);
-      if (success) {
-        info.fsyncListeners.add(walEntry.getWalFlushListener());
-      }
+      handleInfoEntry(walEntry);
       return false;
     }
 
-    /**
-     * Handle a normal WALEntry.
-     *
-     * @return true if serialization is successful.
-     */
-    private boolean handleInfoEntry(WALEntry walEntry) {
+    /** Handle a normal WALEntry. */
+    private void handleInfoEntry(WALEntry walEntry) {
       int size = byteBufferView.position();
       try {
         long start = System.nanoTime();
@@ -245,9 +238,9 @@ public class WALBuffer extends AbstractWALBuffer {
         logger.error(
             "Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e);
         walEntry.getWalFlushListener().fail(e);
-        return false;
+        return;
       }
-      // update search index
+      // parse search index
       long searchIndex = DEFAULT_SEARCH_INDEX;
       if (walEntry.getType().needSearch()) {
         if (walEntry.getType() == WALEntryType.DELETE_DATA_NODE) {
@@ -260,9 +253,11 @@ public class WALBuffer extends AbstractWALBuffer {
           currentFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
         }
       }
+      // update related info
       totalSize += size;
       info.metaData.add(size, searchIndex);
-      return true;
+      walEntry.getWalFlushListener().getWalPipeHandler().setSize(size);
+      info.fsyncListeners.add(walEntry.getWalFlushListener());
     }
 
     /**
@@ -439,8 +434,11 @@ public class WALBuffer extends AbstractWALBuffer {
     @Override
     public void run() {
       long start = System.nanoTime();
+      long walFileVersionId = currentWALFileVersion;
+      long position = currentWALFileWriter.size();
       currentWALFileWriter.updateFileStatus(fileStatus);
 
+      // calculate buffer used ratio
       double usedRatio = (double) syncingBuffer.position() / syncingBuffer.capacity();
       WRITING_METRICS.recordWALBufferUsedRatio(usedRatio);
       logger.debug(
@@ -497,6 +495,10 @@ public class WALBuffer extends AbstractWALBuffer {
       if (forceSuccess) {
         for (WALFlushListener fsyncListener : info.fsyncListeners) {
           fsyncListener.succeed();
+          if (fsyncListener.getWalPipeHandler() != null) {
+            fsyncListener.getWalPipeHandler().setEntryPosition(walFileVersionId, position);
+            position += fsyncListener.getWalPipeHandler().getSize();
+          }
         }
       }
       WRITING_METRICS.recordWALBufferEntriesCount(info.fsyncListeners.size());
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
index 64beef1b37b..04781a82b22 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
@@ -70,14 +70,14 @@ public abstract class WALEntry implements SerializedSize {
     } else {
       throw new RuntimeException("Unknown WALEntry type");
     }
-    walFlushListener = new WALFlushListener(wait);
+    walFlushListener = new WALFlushListener(wait, value);
   }
 
   protected WALEntry(WALEntryType type, long memTableId, WALEntryValue value, boolean wait) {
     this.type = type;
     this.memTableId = memTableId;
     this.value = value;
-    this.walFlushListener = new WALFlushListener(wait);
+    this.walFlushListener = new WALFlushListener(wait, value);
   }
 
   public abstract void serialize(IWALByteBufferView buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
index e9bcbf81df1..4870e9588cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
@@ -23,9 +23,11 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.service.metrics.recorder.WritingMetricsManager;
+import org.apache.iotdb.db.wal.exception.MemTablePinException;
 import org.apache.iotdb.db.wal.io.CheckpointWriter;
 import org.apache.iotdb.db.wal.io.ILogWriter;
 import org.apache.iotdb.db.wal.utils.CheckpointFileUtils;
+import org.apache.iotdb.db.wal.utils.WALInsertNodeCache;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,6 +86,15 @@ public class CheckpointManager implements AutoCloseable {
     logHeader();
   }
 
+  private List<MemTableInfo> snapshotMemTableInfos() {
+    infoLock.lock();
+    try {
+      return new ArrayList<>(memTableId2Info.values());
+    } finally {
+      infoLock.unlock();
+    }
+  }
+
   private void logHeader() {
     infoLock.lock();
     try {
@@ -108,9 +119,9 @@ public class CheckpointManager implements AutoCloseable {
    */
   private void makeGlobalInfoCP() {
     long start = System.nanoTime();
-    Checkpoint checkpoint =
-        new Checkpoint(
-            CheckpointType.GLOBAL_MEMORY_TABLE_INFO, new ArrayList<>(memTableId2Info.values()));
+    List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
+    memTableInfos.removeIf(MemTableInfo::isFlushed);
+    Checkpoint checkpoint = new Checkpoint(CheckpointType.GLOBAL_MEMORY_TABLE_INFO, memTableInfos);
     logByCachedByteBuffer(checkpoint);
     WRITING_METRICS.recordMakeCheckpointCost(checkpoint.getType(), System.nanoTime() - start);
   }
@@ -138,10 +149,14 @@ public class CheckpointManager implements AutoCloseable {
     infoLock.lock();
     long start = System.nanoTime();
     try {
-      MemTableInfo memTableInfo = memTableId2Info.remove(memTableId);
+      MemTableInfo memTableInfo = memTableId2Info.get(memTableId);
       if (memTableInfo == null) {
         return;
       }
+      memTableInfo.setFlushed();
+      if (!memTableInfo.isPinned()) {
+        memTableId2Info.remove(memTableId);
+      }
       Checkpoint checkpoint =
           new Checkpoint(
               CheckpointType.FLUSH_MEMORY_TABLE, Collections.singletonList(memTableInfo));
@@ -224,16 +239,69 @@ public class CheckpointManager implements AutoCloseable {
   }
   // endregion
 
-  /** Get MemTableInfo of oldest MemTable, whose first version id is smallest */
-  public MemTableInfo getOldestMemTableInfo() {
-    // find oldest memTable
-    List<MemTableInfo> memTableInfos;
+  // region methods for pipe
+  /**
+   * Pin the wal files of the given memory table. Notice: cannot pin one memTable too long,
+   * otherwise the wal disk usage may too large.
+   *
+   * @throws MemTablePinException If the memTable has been flushed
+   */
+  public void pinMemTable(long memTableId) throws MemTablePinException {
+    infoLock.lock();
+    try {
+      if (!memTableId2Info.containsKey(memTableId)) {
+        throw new MemTablePinException(
+            String.format(
+                "Fail to pin memTable-%d because this memTable doesn't exist in the wal.",
+                memTableId));
+      }
+      MemTableInfo memTableInfo = memTableId2Info.get(memTableId);
+      if (!memTableInfo.isPinned()) {
+        WALInsertNodeCache.getInstance().addMemTable(memTableId);
+      }
+      memTableInfo.pin();
+    } finally {
+      infoLock.unlock();
+    }
+  }
+
+  /**
+   * Unpin the wal files of the given memory table.
+   *
+   * @throws MemTablePinException If there aren't corresponding pin operations
+   */
+  public void unpinMemTable(long memTableId) throws MemTablePinException {
     infoLock.lock();
     try {
-      memTableInfos = new ArrayList<>(memTableId2Info.values());
+      if (!memTableId2Info.containsKey(memTableId)) {
+        throw new MemTablePinException(
+            String.format(
+                "Fail to unpin memTable-%d because this memTable doesn't exist in the wal.",
+                memTableId));
+      }
+      if (!memTableId2Info.get(memTableId).isPinned()) {
+        throw new MemTablePinException(
+            String.format(
+                "Fail to unpin memTable-%d because this memTable hasn't been pinned.", memTableId));
+      }
+      MemTableInfo memTableInfo = memTableId2Info.get(memTableId);
+      memTableInfo.unpin();
+      if (!memTableInfo.isPinned()) {
+        WALInsertNodeCache.getInstance().removeMemTable(memTableId);
+        if (memTableInfo.isFlushed()) {
+          memTableId2Info.remove(memTableId);
+        }
+      }
     } finally {
       infoLock.unlock();
     }
+  }
+  // endregion
+
+  /** Get MemTableInfo of oldest MemTable, whose first version id is smallest */
+  public MemTableInfo getOldestMemTableInfo() {
+    // find oldest memTable
+    List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
     if (memTableInfos.isEmpty()) {
       return null;
     }
@@ -252,13 +320,7 @@ public class CheckpointManager implements AutoCloseable {
    * @return Return {@link Long#MIN_VALUE} if no file is valid
    */
   public long getFirstValidWALVersionId() {
-    List<MemTableInfo> memTableInfos;
-    infoLock.lock();
-    try {
-      memTableInfos = new ArrayList<>(memTableId2Info.values());
-    } finally {
-      infoLock.unlock();
-    }
+    List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
     long firstValidVersionId = memTableInfos.isEmpty() ? Long.MIN_VALUE : Long.MAX_VALUE;
     for (MemTableInfo memTableInfo : memTableInfos) {
       firstValidVersionId = Math.min(firstValidVersionId, memTableInfo.getFirstFileVersionId());
@@ -268,25 +330,17 @@ public class CheckpointManager implements AutoCloseable {
 
   /** Get total cost of active memTables */
   public long getTotalCostOfActiveMemTables() {
+    List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
     long totalCost = 0;
-
-    if (!config.isEnableMemControl()) {
-      infoLock.lock();
-      try {
-        totalCost = memTableId2Info.size();
-      } finally {
-        infoLock.unlock();
-      }
-    } else {
-      List<MemTableInfo> memTableInfos;
-      infoLock.lock();
-      try {
-        memTableInfos = new ArrayList<>(memTableId2Info.values());
-      } finally {
-        infoLock.unlock();
+    for (MemTableInfo memTableInfo : memTableInfos) {
+      // flushed memTables are not active
+      if (memTableInfo.isFlushed()) {
+        continue;
       }
-      for (MemTableInfo memTableInfo : memTableInfos) {
+      if (config.isEnableMemControl()) {
         totalCost += memTableInfo.getMemTable().getTVListsRamCost();
+      } else {
+        totalCost++;
       }
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/MemTableInfo.java b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/MemTableInfo.java
index 3de6f7fcdae..bcf1ebe338e 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/MemTableInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/MemTableInfo.java
@@ -29,7 +29,7 @@ import java.nio.ByteBuffer;
 import java.util.Objects;
 
 /**
- * MemTableInfo records brief info of one memtable, including memTable id, tsFile path, and .wal
+ * MemTableInfo records brief info of one memTable, including memTable id, tsFile path, and .wal
  * file version id of its first {@link WALEntry}.
  */
 public class MemTableInfo implements SerializedSize {
@@ -38,6 +38,10 @@ public class MemTableInfo implements SerializedSize {
 
   /** memTable */
   private IMemTable memTable;
+  /** memTable pin count */
+  private int pinCount;
+  /** memTable is flushed or not */
+  private boolean flushed;
   /** memTable id */
   private long memTableId;
   /** path of the tsFile which this memTable will be flushed to */
@@ -94,6 +98,28 @@ public class MemTableInfo implements SerializedSize {
     return memTable;
   }
 
+  public void pin() {
+    this.pinCount++;
+  }
+
+  public void unpin() {
+    this.pinCount--;
+  }
+
+  public boolean isPinned() {
+    return pinCount > 0;
+  }
+
+  public boolean isFlushed() {
+    return flushed;
+  }
+
+  public void setFlushed() {
+    // avoid memory leak;
+    this.memTable = null;
+    this.flushed = true;
+  }
+
   public long getMemTableId() {
     return memTableId;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java b/server/src/main/java/org/apache/iotdb/db/wal/exception/MemTablePinException.java
similarity index 70%
copy from server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
copy to server/src/main/java/org/apache/iotdb/db/wal/exception/MemTablePinException.java
index 846e47961f9..3e9722d8cc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/exception/MemTablePinException.java
@@ -16,11 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.wal.utils.listener;
+package org.apache.iotdb.db.wal.exception;
 
-/** This class helps judge whether wal is flushed to the storage device. */
-public class WALFlushListener extends AbstractResultListener {
-  public WALFlushListener(boolean wait) {
-    super(wait);
+public class MemTablePinException extends WALException {
+  public MemTablePinException(Throwable cause) {
+    super(cause);
+  }
+
+  public MemTablePinException(String message) {
+    super(message);
+  }
+
+  public MemTablePinException(String message, Throwable cause) {
+    super(message, cause);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java b/server/src/main/java/org/apache/iotdb/db/wal/exception/WALPipeException.java
similarity index 71%
copy from server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
copy to server/src/main/java/org/apache/iotdb/db/wal/exception/WALPipeException.java
index 846e47961f9..e596ca3cf93 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/exception/WALPipeException.java
@@ -16,11 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.wal.utils.listener;
+package org.apache.iotdb.db.wal.exception;
 
-/** This class helps judge whether wal is flushed to the storage device. */
-public class WALFlushListener extends AbstractResultListener {
-  public WALFlushListener(boolean wait) {
-    super(wait);
+public class WALPipeException extends WALException {
+  public WALPipeException(Throwable cause) {
+    super(cause);
+  }
+
+  public WALPipeException(String message) {
+    super(message);
+  }
+
+  public WALPipeException(String message, Throwable cause) {
+    super(message, cause);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java
index d5c02437950..1845bd19fc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java
@@ -42,8 +42,12 @@ public class WALByteBufReader implements Closeable {
   private final Iterator<Integer> sizeIterator;
 
   public WALByteBufReader(File logFile) throws IOException {
+    this(logFile, FileChannel.open(logFile.toPath(), StandardOpenOption.READ));
+  }
+
+  public WALByteBufReader(File logFile, FileChannel channel) throws IOException {
     this.logFile = logFile;
-    this.channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ);
+    this.channel = channel;
     if (channel.size() < MAGIC_STRING_BYTES || !readTailMagic().equals(MAGIC_STRING)) {
       throw new IOException(String.format("Broken wal file %s", logFile));
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
index 07fd08bfafd..e07078988d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
@@ -28,7 +28,8 @@ import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
 /** This class provides fake wal node when wal is disabled or exception happens. */
 public class WALFakeNode implements IWALNode {
   private final WALFlushListener.Status status;
-  private final Exception cause;
+  private final WALFlushListener successListener;
+  private final WALFlushListener failListener;
 
   private WALFakeNode(WALFlushListener.Status status) {
     this(status, null);
@@ -36,7 +37,10 @@ public class WALFakeNode implements IWALNode {
 
   public WALFakeNode(WALFlushListener.Status status, Exception cause) {
     this.status = status;
-    this.cause = cause;
+    this.successListener = new WALFlushListener(false, null);
+    this.successListener.succeed();
+    this.failListener = new WALFlushListener(false, null);
+    this.failListener.fail(cause);
   }
 
   @Override
@@ -56,18 +60,13 @@ public class WALFakeNode implements IWALNode {
   }
 
   private WALFlushListener getResult() {
-    WALFlushListener walFlushListener = new WALFlushListener(false);
     switch (status) {
       case SUCCESS:
-        walFlushListener.succeed();
-        break;
+        return successListener;
       case FAILURE:
-        walFlushListener.fail(cause);
-        break;
       default:
-        break;
+        return failListener;
     }
-    return walFlushListener;
   }
 
   @Override
@@ -102,7 +101,12 @@ public class WALFakeNode implements IWALNode {
 
   @Override
   public long getCurrentSearchIndex() {
-    throw new UnsupportedOperationException();
+    return 0;
+  }
+
+  @Override
+  public long getCurrentWALFileVersion() {
+    return 0;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index fe4ae858438..6a6ede3a3cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.wal.buffer.WALInfoEntry;
 import org.apache.iotdb.db.wal.buffer.WALSignalEntry;
 import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
 import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
+import org.apache.iotdb.db.wal.exception.MemTablePinException;
 import org.apache.iotdb.db.wal.io.WALByteBufReader;
 import org.apache.iotdb.db.wal.utils.WALFileStatus;
 import org.apache.iotdb.db.wal.utils.WALFileUtils;
@@ -148,6 +149,9 @@ public class WALNode implements IWALNode {
 
   private WALFlushListener log(WALEntry walEntry) {
     buffer.write(walEntry);
+    // set handler for pipe
+    walEntry.getWalFlushListener().getWalPipeHandler().setMemTableId(walEntry.getMemTableId());
+    walEntry.getWalFlushListener().getWalPipeHandler().setWalNode(this);
     return walEntry.getWalFlushListener();
   }
 
@@ -183,6 +187,18 @@ public class WALNode implements IWALNode {
     checkpointManager.makeCreateMemTableCP(memTableInfo);
   }
 
+  // region methods for pipe
+  /** Pin the wal files of the given memory table */
+  public void pinMemTable(long memTableId) throws MemTablePinException {
+    checkpointManager.pinMemTable(memTableId);
+  }
+
+  /** Unpin the wal files of the given memory table */
+  public void unpinMemTable(long memTableId) throws MemTablePinException {
+    checkpointManager.unpinMemTable(memTableId);
+  }
+  // endregion
+
   // region Task to delete outdated .wal files
   /** Delete outdated .wal files */
   public void deleteOutdatedFiles() {
@@ -197,6 +213,8 @@ public class WALNode implements IWALNode {
     private static final int MAX_RECURSION_TIME = 5;
     /** .wal files whose version ids are less than first valid version id should be deleted */
     private long firstValidVersionId;
+    /** the effective information ratio */
+    private double effectiveInfoRatio;
     /** recursion time of calling deletion */
     private int recursionTime = 0;
 
@@ -232,7 +250,7 @@ public class WALNode implements IWALNode {
       if (totalCost == 0) {
         return;
       }
-      double effectiveInfoRatio = (double) costOfActiveMemTables / totalCost;
+      effectiveInfoRatio = (double) costOfActiveMemTables / totalCost;
       WRITING_METRICS.recordWALNodeEffectiveInfoRatio(identifier, effectiveInfoRatio);
       logger.debug(
           "Effective information ratio is {}, active memTables cost is {}, flushed memTables cost is {}",
@@ -243,7 +261,7 @@ public class WALNode implements IWALNode {
       // update first valid version id by snapshotting or flushing memTable,
       // then delete old .wal files again
       if (effectiveInfoRatio < config.getWalMinEffectiveInfoRatio()) {
-        logger.info(
+        logger.debug(
             "Effective information ratio {} (active memTables cost is {}, flushed memTables cost is {}) of wal node-{} is below wal min effective info ratio {}, some memTables will be snapshot or flushed.",
             effectiveInfoRatio,
             costOfActiveMemTables,
@@ -328,7 +346,7 @@ public class WALNode implements IWALNode {
     /**
      * Snapshot or flush one memTable,
      *
-     * @return true if snapshot or flushed
+     * @return true if snapshot or flush is executed successfully
      */
     private boolean snapshotOrFlushMemTable() {
       // find oldest memTable
@@ -336,6 +354,15 @@ public class WALNode implements IWALNode {
       if (oldestMemTableInfo == null) {
         return false;
       }
+      if (oldestMemTableInfo.isPinned()) {
+        logger.info(
+            "MemTable-{} is pinned and effective information ratio {} of wal node-{} is below wal min effective info ratio {}.",
+            oldestMemTableInfo.getMemTableId(),
+            effectiveInfoRatio,
+            identifier,
+            config.getWalMinEffectiveInfoRatio());
+        return false;
+      }
       IMemTable oldestMemTable = oldestMemTableInfo.getMemTable();
 
       // get memTable's virtual database processor
@@ -376,10 +403,12 @@ public class WALNode implements IWALNode {
             dataRegion.submitAFlushTask(
                 TsFileUtils.getTimePartition(tsFile), TsFileUtils.isSequence(tsFile), memTable);
         logger.info(
-            "WAL node-{} flushes memTable-{} to TsFile {}, memTable size is {}.",
+            "WAL node-{} flushes memTable-{} to TsFile {} because Effective information ratio {} is below wal min effective info ratio {}, memTable size is {}.",
             identifier,
             memTable.getMemTableId(),
             tsFile,
+            effectiveInfoRatio,
+            config.getWalMinEffectiveInfoRatio(),
             memTable.getTVListsRamCost());
       }
 
@@ -444,9 +473,11 @@ public class WALNode implements IWALNode {
             logger.error("Fail to snapshot memTable of {}", tsFile, flushListener.getCause());
           }
           logger.info(
-              "WAL node-{} snapshots memTable-{} to wal files, memTable size is {}.",
+              "WAL node-{} snapshots memTable-{} to wal files because Effective information ratio {} is below wal min effective info ratio {}, memTable size is {}.",
               identifier,
               memTable.getMemTableId(),
+              effectiveInfoRatio,
+              config.getWalMinEffectiveInfoRatio(),
               memTable.getTVListsRamCost());
         }
       } finally {
@@ -742,11 +773,15 @@ public class WALNode implements IWALNode {
     return buffer.getCurrentSearchIndex();
   }
 
+  @Override
+  public long getCurrentWALFileVersion() {
+    return buffer.getCurrentWALFileVersion();
+  }
+
   @Override
   public long getTotalSize() {
     return WALManager.getInstance().getTotalDiskUsage();
   }
-
   // endregion
 
   @Override
@@ -759,6 +794,11 @@ public class WALNode implements IWALNode {
     return logDirectory;
   }
 
+  /** Get the .wal file starts with the specified version id */
+  public File getWALFile(long versionId) {
+    return WALFileUtils.getWALFile(logDirectory, versionId);
+  }
+
   /** Return true when all wal entries all consumed and flushed */
   public boolean isAllWALEntriesConsumed() {
     return buffer.isAllWALEntriesConsumed();
@@ -788,4 +828,9 @@ public class WALNode implements IWALNode {
   long getCurrentLogVersion() {
     return buffer.getCurrentWALFileVersion();
   }
+
+  @TestOnly
+  CheckpointManager getCheckpointManager() {
+    return checkpointManager;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryPosition.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryPosition.java
new file mode 100644
index 00000000000..adab38dc5b3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryPosition.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.Objects;
+
+/**
+ * This class uses the tuple(file, position, size) to denote the position of the wal entry, and give
+ * some methods to read the content from the disk.
+ */
+public class WALEntryPosition {
+  private static final WALInsertNodeCache CACHE = WALInsertNodeCache.getInstance();
+  private volatile long walFileVersionId = -1;
+  private volatile long position;
+  private volatile int size;
+  /** wal node, null when wal is disabled */
+  private WALNode walNode = null;
+  /** wal file is not null when openReadFileChannel method has been called */
+  private File walFile = null;
+
+  public WALEntryPosition() {}
+
+  public WALEntryPosition(long walFileVersionId, long position, int size) {
+    this.walFileVersionId = walFileVersionId;
+    this.position = position;
+    this.size = size;
+  }
+
+  /** Read the wal entry and parse it to the InsertNode. Use LRU cache to accelerate read. */
+  public InsertNode readInsertNodeViaCache() throws IOException {
+    if (!canRead()) {
+      throw new IOException("This entry isn't ready for read.");
+    }
+    return CACHE.get(this);
+  }
+
+  /** Read the byte buffer directly. */
+  ByteBuffer read() throws IOException {
+    if (!canRead()) {
+      throw new IOException("Target file hasn't been specified.");
+    }
+    try (FileChannel channel = openReadFileChannel()) {
+      ByteBuffer buffer = ByteBuffer.allocate(size);
+      channel.position(position);
+      channel.read(buffer);
+      buffer.clear();
+      return buffer;
+    }
+  }
+
+  /**
+   * open the read file channel for this wal entry, this method will retry automatically when the
+   * file is sealed when opening the file channel
+   */
+  public FileChannel openReadFileChannel() throws IOException {
+    if (isInSealedFile()) {
+      walFile = walNode.getWALFile(walFileVersionId);
+      return FileChannel.open(walFile.toPath(), StandardOpenOption.READ);
+    } else {
+      try {
+        walFile = walNode.getWALFile(walFileVersionId);
+        return FileChannel.open(walFile.toPath(), StandardOpenOption.READ);
+      } catch (IOException e) {
+        // unsealed file may be renamed after sealed, so we should try again
+        if (isInSealedFile()) {
+          walFile = walNode.getWALFile(walFileVersionId);
+          return FileChannel.open(walFile.toPath(), StandardOpenOption.READ);
+        } else {
+          throw e;
+        }
+      }
+    }
+  }
+
+  /** Return true only when the tuple(file, position, size) is ready. */
+  public boolean canRead() {
+    return walFileVersionId >= 0;
+  }
+
+  /** Return true only when this wal file is sealed. */
+  public boolean isInSealedFile() {
+    if (walNode == null || !canRead()) {
+      throw new RuntimeException("This entry isn't ready for read.");
+    }
+    return walFileVersionId < walNode.getCurrentWALFileVersion();
+  }
+
+  public void setWalNode(WALNode walNode) {
+    this.walNode = walNode;
+  }
+
+  public void setEntryPosition(long walFileVersionId, long position) {
+    this.position = position;
+    this.walFileVersionId = walFileVersionId;
+  }
+
+  public long getWalFileVersionId() {
+    return walFileVersionId;
+  }
+
+  public File getWalFile() {
+    return walFile;
+  }
+
+  public long getPosition() {
+    return position;
+  }
+
+  public void setSize(int size) {
+    this.size = size;
+  }
+
+  public int getSize() {
+    return size;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(walFileVersionId, position);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    WALEntryPosition that = (WALEntryPosition) o;
+    return walFileVersionId == that.walFileVersionId && position == that.position;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
index 8caade5b2ed..16efb6bc8ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
@@ -71,6 +71,17 @@ public class WALFileUtils {
     return dir.listFiles(WALFileUtils::walFilenameFilter);
   }
 
+  /** Get the .wal file starts with the specified version id in the directory */
+  public static File getWALFile(File dir, long versionId) {
+    String filePrefix = WAL_FILE_PREFIX + versionId + FILE_NAME_SEPARATOR;
+    File[] files =
+        dir.listFiles((d, name) -> walFilenameFilter(d, name) && name.startsWith(filePrefix));
+    if (files == null || files.length != 1) {
+      return null;
+    }
+    return files[0];
+  }
+
   /** Parse version id from filename */
   public static long parseVersionId(String filename) {
     Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(filename);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCache.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCache.java
new file mode 100644
index 00000000000..736009f0212
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCache.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALEntryType;
+import org.apache.iotdb.db.wal.io.WALByteBufReader;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Weigher;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** This cache is used by {@link WALEntryPosition} */
+public class WALInsertNodeCache {
+  private static final Logger logger = LoggerFactory.getLogger(WALInsertNodeCache.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  /** LRU cache, find InsertNode by WALEntryPosition */
+  private final LoadingCache<WALEntryPosition, InsertNode> lruCache;
+
+  /** ids of all pinned memTables */
+  private final Set<Long> memTablesNeedSearch = ConcurrentHashMap.newKeySet();;
+
+  private WALInsertNodeCache() {
+    lruCache =
+        Caffeine.newBuilder()
+            // TODO: pipe module should determine how to configure this param
+            .maximumWeight(config.getAllocateMemoryForWALPipeCache())
+            .weigher(
+                (Weigher<WALEntryPosition, InsertNode>) (position, buffer) -> position.getSize())
+            .build(new WALInsertNodeCacheLoader());
+  }
+
+  public InsertNode get(WALEntryPosition position) {
+    InsertNode res = lruCache.getIfPresent(position);
+    // batch load from the wal file
+    if (res == null) {
+      res = lruCache.getAll(Collections.singleton(position)).get(position);
+    }
+    return res;
+  }
+
+  boolean contains(WALEntryPosition position) {
+    return lruCache.getIfPresent(position) != null;
+  }
+
+  public void addMemTable(long memTableId) {
+    memTablesNeedSearch.add(memTableId);
+  }
+
+  public void removeMemTable(long memTableId) {
+    memTablesNeedSearch.remove(memTableId);
+  }
+
+  public void clear() {
+    lruCache.invalidateAll();
+    memTablesNeedSearch.clear();
+  }
+
+  class WALInsertNodeCacheLoader implements CacheLoader<WALEntryPosition, InsertNode> {
+    private InsertNode parse(ByteBuffer buffer) {
+      PlanNode node = WALEntry.deserializeForConsensus(buffer);
+      if (node instanceof InsertNode) {
+        return (InsertNode) node;
+      } else {
+        return null;
+      }
+    }
+
+    @Override
+    public @Nullable InsertNode load(@NonNull WALEntryPosition key) throws Exception {
+      return parse(key.read());
+    }
+
+    /** Batch load all wal entries in the file when any one key is absent. */
+    @Override
+    public @NonNull Map<@NonNull WALEntryPosition, @NonNull InsertNode> loadAll(
+        @NonNull Iterable<? extends @NonNull WALEntryPosition> keys) {
+      Map<WALEntryPosition, InsertNode> res = new HashMap<>();
+      for (WALEntryPosition pos : keys) {
+        if (res.containsKey(pos) || !pos.canRead()) {
+          continue;
+        }
+        long walFileVersionId = pos.getWalFileVersionId();
+        // load one when wal file is not sealed
+        if (!pos.isInSealedFile()) {
+          try {
+            res.put(pos, load(pos));
+          } catch (Exception e) {
+            logger.info(
+                "Fail to cache wal entries from the wal file with version id {}",
+                walFileVersionId,
+                e);
+          }
+          continue;
+        }
+        // batch load when wal file is sealed
+        long position = 0;
+        try (FileChannel channel = pos.openReadFileChannel();
+            WALByteBufReader walByteBufReader = new WALByteBufReader(pos.getWalFile(), channel)) {
+          while (walByteBufReader.hasNext()) {
+            // see WALInfoEntry#serialize, entry type + memtable id + plan node type
+            ByteBuffer buffer = walByteBufReader.next();
+            int size = buffer.capacity();
+            WALEntryType type = WALEntryType.valueOf(buffer.get());
+            long memTableId = buffer.getLong();
+            if ((memTablesNeedSearch.contains(memTableId) || pos.getPosition() == position)
+                && type.needSearch()) {
+              buffer.clear();
+              InsertNode node = parse(buffer);
+              if (node != null) {
+                res.put(new WALEntryPosition(walFileVersionId, position, size), node);
+              }
+            }
+            position += size;
+          }
+        } catch (IOException e) {
+          logger.info(
+              "Fail to cache wal entries from the wal file with version id {}",
+              walFileVersionId,
+              e);
+        }
+      }
+      return res;
+    }
+  }
+
+  public static WALInsertNodeCache getInstance() {
+    return InstanceHolder.INSTANCE;
+  }
+
+  private static class InstanceHolder {
+    private InstanceHolder() {}
+
+    private static final WALInsertNodeCache INSTANCE = new WALInsertNodeCache();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
new file mode 100644
index 00000000000..abdb4771a93
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.wal.buffer.WALEntryValue;
+import org.apache.iotdb.db.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.wal.exception.WALPipeException;
+import org.apache.iotdb.db.wal.node.WALNode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This handler is used by the Pipe to find the corresponding insert node. Besides, it can try to
+ * pin/unpin the wal entries by the memTable id.
+ */
+public class WALPipeHandler {
+  private static final Logger logger = LoggerFactory.getLogger(WALPipeHandler.class);
+
+  private long memTableId = -1;
+  /** cached value, null after this value is flushed to wal successfully */
+  private volatile WALEntryValue value;
+  /** wal entry's position in the wal, valid after the value is flushed to wal successfully */
+  private final WALEntryPosition walEntryPosition = new WALEntryPosition();
+  /** wal node, null when wal is disabled */
+  private WALNode walNode = null;
+
+  public WALPipeHandler(WALEntryValue value) {
+    this.value = value;
+  }
+
+  /**
+   * Pin the wal files of the given memory table. Notice: cannot pin one memTable too long,
+   * otherwise the wal disk usage may too large.
+   *
+   * @throws MemTablePinException If the memTable has been flushed
+   */
+  public void pinMemTable() throws MemTablePinException {
+    if (walNode == null || memTableId < 0) {
+      throw new MemTablePinException("Fail to pin memTable because of internal error.");
+    }
+    walNode.pinMemTable(memTableId);
+  }
+
+  /**
+   * Unpin the wal files of the given memory table.
+   *
+   * @throws MemTablePinException If there aren't corresponding pin operations
+   */
+  public void unpinMemTable() throws MemTablePinException {
+    if (walNode == null || memTableId < 0) {
+      throw new MemTablePinException("Fail to pin memTable because of internal error.");
+    }
+    walNode.unpinMemTable(memTableId);
+  }
+
+  /** Get this handler's value */
+  public InsertNode getValue() throws WALPipeException {
+    // return local cache
+    WALEntryValue res = value;
+    if (res != null) {
+      if (res instanceof InsertNode) {
+        return (InsertNode) value;
+      } else {
+        throw new WALPipeException("Fail to get value because the entry type isn't InsertNode.");
+      }
+    }
+    // wait until the position is ready
+    while (!walEntryPosition.canRead()) {
+      try {
+        synchronized (this) {
+          this.wait();
+        }
+      } catch (InterruptedException e) {
+        logger.warn("Interrupted when waiting for result.", e);
+        Thread.currentThread().interrupt();
+      }
+    }
+    // read from the wal file
+    try {
+      return walEntryPosition.readInsertNodeViaCache();
+    } catch (Exception e) {
+      throw new WALPipeException("Fail to get value because the file content isn't correct.", e);
+    }
+  }
+
+  public void setMemTableId(long memTableId) {
+    this.memTableId = memTableId;
+  }
+
+  public void setWalNode(WALNode walNode) {
+    this.walNode = walNode;
+    this.walEntryPosition.setWalNode(walNode);
+  }
+
+  public WALEntryPosition getWalEntryPosition() {
+    return walEntryPosition;
+  }
+
+  public void setEntryPosition(long walFileVersionId, long position) {
+    this.walEntryPosition.setEntryPosition(walFileVersionId, position);
+    this.value = null;
+    synchronized (this) {
+      this.notifyAll();
+    }
+  }
+
+  public int getSize() {
+    return walEntryPosition.getSize();
+  }
+
+  public void setSize(int size) {
+    this.walEntryPosition.setSize(size);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
index 846e47961f9..1b468635899 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
@@ -18,9 +18,20 @@
  */
 package org.apache.iotdb.db.wal.utils.listener;
 
+import org.apache.iotdb.db.wal.buffer.WALEntryValue;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+
 /** This class helps judge whether wal is flushed to the storage device. */
 public class WALFlushListener extends AbstractResultListener {
-  public WALFlushListener(boolean wait) {
+  // handler for pipeline, only exists then value is InsertNode
+  private final WALPipeHandler walPipeHandler;
+
+  public WALFlushListener(boolean wait, WALEntryValue value) {
     super(wait);
+    walPipeHandler = new WALPipeHandler(value);
+  }
+
+  public WALPipeHandler getWalPipeHandler() {
+    return walPipeHandler;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
index 8542c817ef7..770a3c7fb26 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
@@ -70,15 +70,14 @@ public class WALNodeTest {
   private static final String logDirectory = TestConstant.BASE_OUTPUT_PATH.concat("wal-test");
   private static final String devicePath = "root.test_sg.test_d";
   private WALMode prevMode;
+  private boolean prevIsClusterMode;
   private WALNode walNode;
 
-  private boolean isClusterMode;
-
   @Before
   public void setUp() throws Exception {
     EnvironmentUtils.cleanDir(logDirectory);
     prevMode = config.getWalMode();
-    isClusterMode = config.isClusterMode();
+    prevIsClusterMode = config.isClusterMode();
     config.setWalMode(WALMode.SYNC);
     config.setClusterMode(true);
     walNode = new WALNode(identifier, logDirectory);
@@ -88,7 +87,7 @@ public class WALNodeTest {
   public void tearDown() throws Exception {
     walNode.close();
     config.setWalMode(prevMode);
-    config.setClusterMode(isClusterMode);
+    config.setClusterMode(prevIsClusterMode);
     EnvironmentUtils.cleanDir(logDirectory);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/WALPipeHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/WALPipeHandlerTest.java
new file mode 100644
index 00000000000..77c28c6d333
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/WALPipeHandlerTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.node;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
+import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
+import org.apache.iotdb.db.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.wal.utils.WALMode;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class WALPipeHandlerTest {
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final String identifier = String.valueOf(Integer.MAX_VALUE);
+  private static final String logDirectory = TestConstant.BASE_OUTPUT_PATH.concat("wal-test");
+  private static final String devicePath = "root.test_sg.test_d";
+  private WALMode prevMode;
+  private boolean prevIsClusterMode;
+  private WALNode walNode;
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.cleanDir(logDirectory);
+    prevMode = config.getWalMode();
+    prevIsClusterMode = config.isClusterMode();
+    config.setWalMode(WALMode.SYNC);
+    config.setClusterMode(true);
+    walNode = new WALNode(identifier, logDirectory);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    walNode.close();
+    config.setWalMode(prevMode);
+    config.setClusterMode(prevIsClusterMode);
+    EnvironmentUtils.cleanDir(logDirectory);
+  }
+
+  @Test(expected = MemTablePinException.class)
+  public void pinDeletedMemTable() throws Exception {
+    IMemTable memTable = new PrimitiveMemTable();
+    walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+    WALFlushListener flushListener =
+        walNode.log(
+            memTable.getMemTableId(), getInsertRowNode(devicePath, System.currentTimeMillis()));
+    walNode.onMemTableFlushed(memTable);
+    // pin flushed memTable
+    WALPipeHandler handler = flushListener.getWalPipeHandler();
+    handler.pinMemTable();
+  }
+
+  @Test
+  public void pinMemTable() throws Exception {
+    IMemTable memTable = new PrimitiveMemTable();
+    walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+    InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
+    node1.setSearchIndex(1);
+    WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
+    // pin memTable
+    WALPipeHandler handler = flushListener.getWalPipeHandler();
+    handler.pinMemTable();
+    walNode.onMemTableFlushed(memTable);
+    // roll wal file
+    walNode.rollWALFile();
+    walNode.rollWALFile();
+    // find node1
+    ConsensusReqReader.ReqIterator itr = walNode.getReqIterator(1);
+    assertTrue(itr.hasNext());
+    assertEquals(
+        node1,
+        WALEntry.deserializeForConsensus(itr.next().getRequests().get(0).serializeToByteBuffer()));
+    // try to delete flushed but pinned memTable
+    walNode.deleteOutdatedFiles();
+    // try to find node1
+    itr = walNode.getReqIterator(1);
+    assertTrue(itr.hasNext());
+    assertEquals(
+        node1,
+        WALEntry.deserializeForConsensus(itr.next().getRequests().get(0).serializeToByteBuffer()));
+  }
+
+  @Test(expected = MemTablePinException.class)
+  public void unpinDeletedMemTable() throws Exception {
+    IMemTable memTable = new PrimitiveMemTable();
+    walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+    WALFlushListener flushListener =
+        walNode.log(
+            memTable.getMemTableId(), getInsertRowNode(devicePath, System.currentTimeMillis()));
+    walNode.onMemTableFlushed(memTable);
+    // pin flushed memTable
+    WALPipeHandler handler = flushListener.getWalPipeHandler();
+    handler.unpinMemTable();
+  }
+
+  @Test
+  public void unpinFlushedMemTable() throws Exception {
+    IMemTable memTable = new PrimitiveMemTable();
+    walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+    WALFlushListener flushListener =
+        walNode.log(
+            memTable.getMemTableId(), getInsertRowNode(devicePath, System.currentTimeMillis()));
+    WALPipeHandler handler = flushListener.getWalPipeHandler();
+    // pin twice
+    handler.pinMemTable();
+    handler.pinMemTable();
+    walNode.onMemTableFlushed(memTable);
+    // unpin 1
+    CheckpointManager checkpointManager = walNode.getCheckpointManager();
+    handler.unpinMemTable();
+    MemTableInfo oldestMemTableInfo = checkpointManager.getOldestMemTableInfo();
+    assertEquals(memTable.getMemTableId(), oldestMemTableInfo.getMemTableId());
+    assertNull(oldestMemTableInfo.getMemTable());
+    assertTrue(oldestMemTableInfo.isPinned());
+    // unpin 2
+    handler.unpinMemTable();
+    assertNull(checkpointManager.getOldestMemTableInfo());
+  }
+
+  @Test
+  public void unpinMemTable() throws Exception {
+    IMemTable memTable = new PrimitiveMemTable();
+    walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+    InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
+    node1.setSearchIndex(1);
+    WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
+    // pin memTable
+    WALPipeHandler handler = flushListener.getWalPipeHandler();
+    handler.pinMemTable();
+    walNode.onMemTableFlushed(memTable);
+    // roll wal file
+    walNode.rollWALFile();
+    walNode.rollWALFile();
+    // find node1
+    ConsensusReqReader.ReqIterator itr = walNode.getReqIterator(1);
+    assertTrue(itr.hasNext());
+    assertEquals(
+        node1,
+        WALEntry.deserializeForConsensus(itr.next().getRequests().get(0).serializeToByteBuffer()));
+    // unpin flushed memTable
+    handler.unpinMemTable();
+    // try to delete flushed but pinned memTable
+    walNode.deleteOutdatedFiles();
+    // try to find node1
+    itr = walNode.getReqIterator(1);
+    assertFalse(itr.hasNext());
+  }
+
+  @Test
+  public void getUnFlushedValue() throws Exception {
+    IMemTable memTable = new PrimitiveMemTable();
+    walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+    InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
+    node1.setSearchIndex(1);
+    WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
+    // pin memTable
+    WALPipeHandler handler = flushListener.getWalPipeHandler();
+    handler.pinMemTable();
+    walNode.onMemTableFlushed(memTable);
+    assertEquals(node1, handler.getValue());
+  }
+
+  @Test
+  public void getFlushedValue() throws Exception {
+    IMemTable memTable = new PrimitiveMemTable();
+    walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+    InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
+    node1.setSearchIndex(1);
+    WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
+    // pin memTable
+    WALPipeHandler handler = flushListener.getWalPipeHandler();
+    handler.pinMemTable();
+    walNode.onMemTableFlushed(memTable);
+    // wait until wal flushed
+    while (!walNode.isAllWALEntriesConsumed()) {
+      Thread.sleep(50);
+    }
+    assertEquals(node1, handler.getValue());
+  }
+
+  private InsertRowNode getInsertRowNode(String devicePath, long time) throws IllegalPathException {
+    TSDataType[] dataTypes =
+        new TSDataType[] {
+          TSDataType.DOUBLE,
+          TSDataType.FLOAT,
+          TSDataType.INT64,
+          TSDataType.INT32,
+          TSDataType.BOOLEAN,
+          TSDataType.TEXT
+        };
+
+    Object[] columns = new Object[6];
+    columns[0] = 1.0d;
+    columns[1] = 2f;
+    columns[2] = 10000L;
+    columns[3] = 100;
+    columns[4] = false;
+    columns[5] = new Binary("hh" + 0);
+
+    InsertRowNode node =
+        new InsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath(devicePath),
+            false,
+            new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+            dataTypes,
+            time,
+            columns,
+            false);
+    MeasurementSchema[] schemas = new MeasurementSchema[6];
+    for (int i = 0; i < 6; i++) {
+      schemas[i] = new MeasurementSchema("s" + (i + 1), dataTypes[i]);
+    }
+    node.setMeasurementSchemas(schemas);
+    return node;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java
new file mode 100644
index 00000000000..98459619a70
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class WALInsertNodeCacheTest {
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final String identifier = String.valueOf(Integer.MAX_VALUE);
+  private static final String logDirectory = TestConstant.BASE_OUTPUT_PATH.concat("wal-test");
+  private static final String devicePath = "root.test_sg.test_d";
+  private static final WALInsertNodeCache cache = WALInsertNodeCache.getInstance();
+  private WALMode prevMode;
+  private boolean prevIsClusterMode;
+  private WALNode walNode;
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.cleanDir(logDirectory);
+    cache.clear();
+    prevMode = config.getWalMode();
+    prevIsClusterMode = config.isClusterMode();
+    config.setWalMode(WALMode.SYNC);
+    config.setClusterMode(true);
+    walNode = new WALNode(identifier, logDirectory);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    walNode.close();
+    cache.clear();
+    config.setWalMode(prevMode);
+    config.setClusterMode(prevIsClusterMode);
+    EnvironmentUtils.cleanDir(logDirectory);
+  }
+
+  @Test
+  public void testLoadUnsealedWALFile() throws Exception {
+    IMemTable memTable = new PrimitiveMemTable();
+    walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+    InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
+    node1.setSearchIndex(1);
+    WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
+    WALEntryPosition position = flushListener.getWalPipeHandler().getWalEntryPosition();
+    // wait until wal flushed
+    while (!walNode.isAllWALEntriesConsumed() || !position.canRead()) {
+      Thread.sleep(50);
+    }
+    // load by cache
+    assertEquals(node1, cache.get(position));
+  }
+
+  @Test
+  public void testBatchLoad() throws Exception {
+    // write memTable1
+    IMemTable memTable1 = new PrimitiveMemTable();
+    walNode.onMemTableCreated(memTable1, logDirectory + "/" + "fake1.tsfile");
+    InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
+    node1.setSearchIndex(1);
+    WALFlushListener flushListener1 = walNode.log(memTable1.getMemTableId(), node1);
+    WALEntryPosition position1 = flushListener1.getWalPipeHandler().getWalEntryPosition();
+    InsertRowNode node2 = getInsertRowNode(devicePath, System.currentTimeMillis());
+    node1.setSearchIndex(2);
+    WALFlushListener flushListener2 = walNode.log(memTable1.getMemTableId(), node2);
+    WALEntryPosition position2 = flushListener2.getWalPipeHandler().getWalEntryPosition();
+    // write memTable2
+    IMemTable memTable2 = new PrimitiveMemTable();
+    walNode.onMemTableCreated(memTable2, logDirectory + "/" + "fake2.tsfile");
+    InsertRowNode node3 = getInsertRowNode(devicePath, System.currentTimeMillis());
+    node1.setSearchIndex(3);
+    WALFlushListener flushListener3 = walNode.log(memTable2.getMemTableId(), node3);
+    WALEntryPosition position3 = flushListener3.getWalPipeHandler().getWalEntryPosition();
+    // wait until wal flushed
+    walNode.rollWALFile();
+    while (!walNode.isAllWALEntriesConsumed() || !position3.canRead()) {
+      Thread.sleep(50);
+    }
+    // check batch load memTable1
+    cache.addMemTable(memTable1.getMemTableId());
+    assertEquals(node1, cache.get(position1));
+    assertTrue(cache.contains(position1));
+    assertTrue(cache.contains(position2));
+    assertFalse(cache.contains(position3));
+    // check batch load none
+    cache.removeMemTable(memTable1.getMemTableId());
+    cache.clear();
+    assertEquals(node1, cache.get(position1));
+    assertTrue(cache.contains(position1));
+    assertFalse(cache.contains(position2));
+    assertFalse(cache.contains(position3));
+  }
+
+  private InsertRowNode getInsertRowNode(String devicePath, long time) throws IllegalPathException {
+    TSDataType[] dataTypes =
+        new TSDataType[] {
+          TSDataType.DOUBLE,
+          TSDataType.FLOAT,
+          TSDataType.INT64,
+          TSDataType.INT32,
+          TSDataType.BOOLEAN,
+          TSDataType.TEXT
+        };
+
+    Object[] columns = new Object[6];
+    columns[0] = 1.0d;
+    columns[1] = 2f;
+    columns[2] = 10000L;
+    columns[3] = 100;
+    columns[4] = false;
+    columns[5] = new Binary("hh" + 0);
+
+    InsertRowNode node =
+        new InsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath(devicePath),
+            false,
+            new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+            dataTypes,
+            time,
+            columns,
+            false);
+    MeasurementSchema[] schemas = new MeasurementSchema[6];
+    for (int i = 0; i < 6; i++) {
+      schemas[i] = new MeasurementSchema("s" + (i + 1), dataTypes[i]);
+    }
+    node.setMeasurementSchemas(schemas);
+    return node;
+  }
+}