You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/01/12 07:51:46 UTC

[iotdb] 01/01: finish

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch DirectByteBuffer
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 16e3dc365107b2eebeb9644eadafc5cb56517269
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue Jan 12 15:50:48 2021 +0800

    finish
---
 .../db/engine/storagegroup/StorageGroupInfo.java   | 51 +++++++++++--
 .../engine/storagegroup/StorageGroupProcessor.java | 87 +++++++++++++++++++++-
 .../db/engine/storagegroup/TsFileProcessor.java    | 12 +--
 .../java/org/apache/iotdb/db/utils/MmapUtil.java   | 71 ++++++++++++++++++
 .../writelog/manager/MultiFileLogNodeManager.java  | 36 +++++----
 .../db/writelog/manager/WriteLogNodeManager.java   |  7 +-
 .../db/writelog/node/ExclusiveWriteLogNode.java    | 24 ++++--
 .../iotdb/db/writelog/node/WriteLogNode.java       |  3 +-
 .../iotdb/db/writelog/recover/LogReplayer.java     | 12 ++-
 .../writelog/recover/TsFileRecoverPerformer.java   | 16 ++--
 .../iotdb/db/writelog/IoTDBLogFileSizeTest.java    | 25 ++++++-
 .../apache/iotdb/db/writelog/PerformanceTest.java  | 50 ++++++++++---
 .../iotdb/db/writelog/WriteLogNodeManagerTest.java | 63 +++++++++++++---
 .../apache/iotdb/db/writelog/WriteLogNodeTest.java | 78 +++++++++++++++----
 .../iotdb/db/writelog/recover/LogReplayerTest.java | 24 +++++-
 .../recover/RecoverResourceFromReaderTest.java     | 54 +++++++++++---
 .../db/writelog/recover/SeqTsFileRecoverTest.java  | 51 +++++++++++--
 .../writelog/recover/UnseqTsFileRecoverTest.java   | 31 +++++++-
 18 files changed, 579 insertions(+), 116 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index a31d41a..d0620a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -18,12 +18,18 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 
+import java.util.function.Consumer;
+import java.util.function.Supplier;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.db.utils.TestOnly;
 
 /**
  * The storageGroupInfo records the total memory cost of the Storage Group.
@@ -33,15 +39,15 @@ public class StorageGroupInfo {
   private StorageGroupProcessor storageGroupProcessor;
 
   /**
-   * The total Storage group memory cost,
-   * including unsealed TsFileResource, ChunkMetadata, WAL, primitive arrays and TEXT values
+   * The total Storage group memory cost, including unsealed TsFileResource, ChunkMetadata, WAL,
+   * primitive arrays and TEXT values
    */
   private AtomicLong memoryCost;
 
   /**
    * The threshold of reporting it's size to SystemInfo
    */
-  private long storageGroupSizeReportThreshold = 
+  private long storageGroupSizeReportThreshold =
       IoTDBDescriptor.getInstance().getConfig().getStorageGroupSizeReportThreshold();
 
   private AtomicLong lastReportedSize = new AtomicLong();
@@ -94,13 +100,46 @@ public class StorageGroupInfo {
   }
 
   /**
-   * When a TsFileProcessor is closing, remove it from reportedTsps, and report to systemInfo
-   * to update SG cost.
-   * 
+   * When a TsFileProcessor is closing, remove it from reportedTsps, and report to systemInfo to
+   * update SG cost.
+   *
    * @param tsFileProcessor
    */
   public void closeTsFileProcessorAndReportToSystem(TsFileProcessor tsFileProcessor) {
     reportedTsps.remove(tsFileProcessor);
     SystemInfo.getInstance().resetStorageGroupStatus(this, true);
   }
+
+  public Supplier<ByteBuffer[]> getWalSupplier() {
+    if (storageGroupProcessor != null) {
+      return storageGroupProcessor::getWalDirectByteBuffer;
+    } else { // only happens in test
+      return this::walSupplier;
+    }
+  }
+
+  @TestOnly
+  private ByteBuffer[] walSupplier() {
+    ByteBuffer[] buffers = new ByteBuffer[2];
+    buffers[0] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    buffers[0] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    return buffers;
+  }
+
+  public Consumer<ByteBuffer[]> getWalConsumer() {
+    if (storageGroupProcessor != null) {
+      return storageGroupProcessor::releaseWalBuffer;
+    } else { // only happens in test
+      return this::walConsumer;
+    }
+  }
+
+  @TestOnly
+  private void walConsumer(ByteBuffer[] buffers) {
+    for (ByteBuffer byteBuffer : buffers) {
+      MmapUtil.clean((MappedByteBuffer) byteBuffer);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index d32386f..9e2479b 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -25,11 +25,14 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -38,7 +41,10 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.io.FileUtils;
@@ -84,6 +90,7 @@ import org.apache.iotdb.db.query.control.QueryFileManager;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.UpgradeSevice;
 import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
+import org.apache.iotdb.db.utils.MmapUtil;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -261,6 +268,79 @@ public class StorageGroupProcessor {
   private List<CloseFileListener> customCloseFileListeners = Collections.emptyList();
   private List<FlushListener> customFlushListeners = Collections.emptyList();
 
+  private static final int WAL_BUFFER_SIZE =
+      IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2;
+
+  private static final int MAX_WAL_BYTEBUFFER_NUM =
+      IoTDBDescriptor.getInstance().getConfig().getConcurrentWritingTimePartition() * 4;
+
+  private static final long DEFAULT_POOL_TRIM_INTERVAL_MILLIS = 10_000;
+
+  private final Deque<ByteBuffer> walByteBufferPool = new LinkedList<>();
+
+  private int currentWalPoolSize = 0;
+
+
+  /**
+   * get the direct byte buffer from pool, each fetch contains two ByteBuffer
+   */
+  public ByteBuffer[] getWalDirectByteBuffer() {
+    ByteBuffer[] res = new ByteBuffer[2];
+    synchronized (walByteBufferPool) {
+      while (walByteBufferPool.isEmpty() && currentWalPoolSize + 2 > MAX_WAL_BYTEBUFFER_NUM) {
+        try {
+          walByteBufferPool.wait();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          logger
+              .error("getDirectByteBuffer occurs error while waiting for DirectByteBuffer"
+                  + "group {}", storageGroupName, e);
+        }
+      }
+      // If the queue is not empty, it must have at least two.
+      if (!walByteBufferPool.isEmpty()) {
+        res[0] = walByteBufferPool.pollFirst();
+        res[1] = walByteBufferPool.pollFirst();
+      } else {
+        // if the queue is empty and current size is less than MAX_BYTEBUFFER_NUM
+        // we can construct another two more new byte buffer
+        currentWalPoolSize += 2;
+        res[0] = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE);
+        res[1] = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE);
+      }
+    }
+    return res;
+  }
+
+  /**
+   * put the byteBuffer back to pool
+   */
+  public void releaseWalBuffer(ByteBuffer[] byteBuffers) {
+    for (ByteBuffer byteBuffer : byteBuffers) {
+      byteBuffer.clear();
+    }
+    synchronized (walByteBufferPool) {
+      walByteBufferPool.addLast(byteBuffers[0]);
+      walByteBufferPool.addLast(byteBuffers[1]);
+      walByteBufferPool.notifyAll();
+    }
+  }
+
+  /**
+   * trim the size of the pool and release the memory of needless direct byte buffer
+   */
+  private void trimTask() {
+    synchronized (walByteBufferPool) {
+      int expectedSize =
+          (workSequenceTsFileProcessors.size() + workUnsequenceTsFileProcessors.size()) * 2;
+      while (expectedSize < currentWalPoolSize && !walByteBufferPool.isEmpty()) {
+        MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast());
+        MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast());
+        currentWalPoolSize -= 2;
+      }
+    }
+  }
+
   public StorageGroupProcessor(String systemDir, String storageGroupName,
       TsFileFlushPolicy fileFlushPolicy) throws StorageGroupProcessorException {
     this.storageGroupName = storageGroupName;
@@ -277,6 +357,9 @@ public class StorageGroupProcessor {
     this.tsFileManagement = IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy()
         .getTsFileManagement(storageGroupName, storageGroupSysDir.getAbsolutePath());
 
+    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+    executorService.scheduleWithFixedDelay(this::trimTask, DEFAULT_POOL_TRIM_INTERVAL_MILLIS,
+        DEFAULT_POOL_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
     recover();
 
   }
@@ -580,7 +663,7 @@ public class StorageGroupProcessor {
       try {
         // this tsfile is not zero level, no need to perform redo wal
         if (LevelCompactionTsFileManagement.getMergeLevel(tsFileResource.getTsFile()) > 0) {
-          writer = recoverPerformer.recover(false);
+          writer = recoverPerformer.recover(false, this::getWalDirectByteBuffer, this::releaseWalBuffer);
           if (writer.hasCrashed()) {
             tsFileManagement.addRecover(tsFileResource, isSeq);
           } else {
@@ -589,7 +672,7 @@ public class StorageGroupProcessor {
           }
           continue;
         } else {
-          writer = recoverPerformer.recover(true);
+          writer = recoverPerformer.recover(true, this::getWalDirectByteBuffer, this::releaseWalBuffer);
         }
       } catch (StorageGroupProcessorException e) {
         logger.warn("Skip TsFile: {} because of error in recover: ", tsFileResource.getTsFilePath(),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 5b404c9..c632782 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -177,8 +177,7 @@ public class TsFileProcessor {
       if (enableMemControl) {
         workMemTable = new PrimitiveMemTable(enableMemControl);
         MemTableManager.getInstance().addMemtableNumber();
-      }
-      else {
+      } else {
         workMemTable = MemTableManager.getInstance().getAvailableMemTable(storageGroupName);
       }
     }
@@ -226,8 +225,7 @@ public class TsFileProcessor {
       if (enableMemControl) {
         workMemTable = new PrimitiveMemTable(enableMemControl);
         MemTableManager.getInstance().addMemtableNumber();
-      }
-      else {
+      } else {
         workMemTable = MemTableManager.getInstance().getAvailableMemTable(storageGroupName);
       }
     }
@@ -871,7 +869,8 @@ public class TsFileProcessor {
   public WriteLogNode getLogNode() {
     if (logNode == null) {
       logNode = MultiFileLogNodeManager.getInstance()
-          .getNode(storageGroupName + "-" + tsFileResource.getTsFile().getName());
+          .getNode(storageGroupName + "-" + tsFileResource.getTsFile().getName(),
+              storageGroupInfo.getWalSupplier());
     }
     return logNode;
   }
@@ -881,7 +880,8 @@ public class TsFileProcessor {
       // when closing resource file, its corresponding mod file is also closed.
       tsFileResource.close();
       MultiFileLogNodeManager.getInstance()
-          .deleteNode(storageGroupName + "-" + tsFileResource.getTsFile().getName());
+          .deleteNode(storageGroupName + "-" + tsFileResource.getTsFile().getName(),
+              storageGroupInfo.getWalConsumer());
     } catch (IOException e) {
       throw new TsFileProcessorException(e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java b/server/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java
new file mode 100644
index 0000000..8ca1cc6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils;
+
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+public class MmapUtil {
+
+  public static void clean(MappedByteBuffer mappedByteBuffer) {
+    if (mappedByteBuffer == null || !mappedByteBuffer.isDirect() || mappedByteBuffer.capacity()== 0)
+      return;
+    invoke(invoke(viewed(mappedByteBuffer), "cleaner"), "clean");
+  }
+
+  private static Object invoke(final Object target, final String methodName, final Class<?>... args) {
+    return AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
+      try {
+        Method method = method(target, methodName, args);
+        method.setAccessible(true);
+        return method.invoke(target);
+      } catch (Exception e) {
+        throw new IllegalStateException(e);
+      }
+    });
+  }
+
+  private static Method method(Object target, String methodName, Class<?>[] args)
+      throws NoSuchMethodException {
+    try {
+      return target.getClass().getMethod(methodName, args);
+    } catch (NoSuchMethodException e) {
+      return target.getClass().getDeclaredMethod(methodName, args);
+    }
+  }
+
+  private static ByteBuffer viewed(ByteBuffer buffer) {
+    String methodName = "viewedBuffer";
+    Method[] methods = buffer.getClass().getMethods();
+    for (Method method : methods) {
+      if (method.getName().equals("attachment")) {
+        methodName = "attachment";
+        break;
+      }
+    }
+    ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName);
+    if (viewedBuffer == null)
+      return buffer;
+    else
+      return viewed(viewedBuffer);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
index f07d69f..00b9c44 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
@@ -19,11 +19,14 @@
 package org.apache.iotdb.db.writelog.manager;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StartupException;
@@ -35,18 +38,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * MultiFileLogNodeManager manages all ExclusiveWriteLogNodes, each manages WALs of a TsFile
- * (either seq or unseq).
+ * MultiFileLogNodeManager manages all ExclusiveWriteLogNodes, each manages WALs of a TsFile (either
+ * seq or unseq).
  */
 public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
 
   private static final Logger logger = LoggerFactory.getLogger(MultiFileLogNodeManager.class);
-  private Map<String, WriteLogNode> nodeMap;
+  private final Map<String, WriteLogNode> nodeMap;
 
   private ScheduledExecutorService executorService;
-  private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  private final void forceTask() {
+  private void forceTask() {
     if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
       logger.warn("system mode is read-only, the force flush WAL task is stopped");
       return;
@@ -75,23 +78,16 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
 
 
   @Override
-  public WriteLogNode getNode(String identifier) {
-    WriteLogNode node = nodeMap.get(identifier);
-    if (node == null) {
-      node = new ExclusiveWriteLogNode(identifier);
-      WriteLogNode oldNode = nodeMap.putIfAbsent(identifier, node);
-      if (oldNode != null) {
-        return oldNode;
-      }
-    }
-    return node;
+  public WriteLogNode getNode(String identifier, Supplier<ByteBuffer[]> supplier) {
+    return nodeMap
+        .computeIfAbsent(identifier, key -> new ExclusiveWriteLogNode(key, supplier.get()));
   }
 
   @Override
-  public void deleteNode(String identifier) throws IOException {
+  public void deleteNode(String identifier, Consumer<ByteBuffer[]> consumer) throws IOException {
     WriteLogNode node = nodeMap.remove(identifier);
     if (node != null) {
-      node.delete();
+      consumer.accept(node.delete());
     }
   }
 
@@ -148,9 +144,11 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
   }
 
   private static class InstanceHolder {
-    private InstanceHolder(){}
 
-    private static MultiFileLogNodeManager instance = new MultiFileLogNodeManager();
+    private InstanceHolder() {
+    }
+
+    private static final MultiFileLogNodeManager instance = new MultiFileLogNodeManager();
   }
 
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java b/server/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java
index 84d02fc..f46d57a 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java
@@ -19,6 +19,9 @@
 package org.apache.iotdb.db.writelog.manager;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
 
 /**
@@ -33,14 +36,14 @@ public interface WriteLogNodeManager {
    * @param identifier -identifier, the format: "{storageGroupName}-{BufferWrite/Overflow}-{
    * nameOfTsFile}"
    */
-  WriteLogNode getNode(String identifier);
+  WriteLogNode getNode(String identifier, Supplier<ByteBuffer[]> supplier);
 
   /**
    * Delete a log node. If the log node does not exist, this will be an empty operation.
    *
    * @param identifier -identifier
    */
-  void deleteNode(String identifier) throws IOException;
+  void deleteNode(String identifier, Consumer<ByteBuffer[]> consumer) throws IOException;
 
   /**
    * Close all nodes.
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index f241db5..6c73713 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -59,10 +59,8 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  private ByteBuffer logBufferWorking = ByteBuffer
-      .allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
-  private ByteBuffer logBufferIdle = ByteBuffer
-      .allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+  private ByteBuffer logBufferWorking;
+  private ByteBuffer logBufferIdle;
   private ByteBuffer logBufferFlushing;
 
   private final Object switchBufferCondition = new Object();
@@ -83,10 +81,12 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
    *
    * @param identifier ExclusiveWriteLogNode identifier
    */
-  public ExclusiveWriteLogNode(String identifier) {
+  public ExclusiveWriteLogNode(String identifier, ByteBuffer[] byteBuffers) {
     this.identifier = identifier;
     this.logDirectory =
         DirectoryManager.getInstance().getWALFolder() + File.separator + this.identifier;
+    this.logBufferWorking = byteBuffers[0];
+    this.logBufferIdle = byteBuffers[1];
     if (SystemFileFactory.INSTANCE.getFile(logDirectory).mkdirs()) {
       logger.info("create the WAL folder {}.", logDirectory);
     }
@@ -197,12 +197,24 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
   }
 
   @Override
-  public void delete() throws IOException {
+  public ByteBuffer[] delete() throws IOException {
     lock.lock();
     try {
       close();
       FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(logDirectory));
       deleted = true;
+      ByteBuffer[] res = new ByteBuffer[2];
+      int index = 0;
+      if (logBufferWorking != null) {
+        res[index++] = logBufferWorking;
+      }
+      if (logBufferIdle != null) {
+        res[index++] = logBufferIdle;
+      }
+      if (logBufferFlushing != null) {
+        res[index] = logBufferFlushing;
+      }
+      return res;
     } finally {
       lock.unlock();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
index a93117b..4ca048f 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.writelog.node;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.writelog.io.ILogReader;
 
@@ -76,7 +77,7 @@ public interface WriteLogNode {
    * Abandon all logs in this node and delete the log directory. Calling insert() after calling
    * this method is undefined.
    */
-  void delete() throws IOException;
+  ByteBuffer[] delete() throws IOException;
 
   /**
    * return an ILogReader which can iterate each log in this log node.
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 4be75cc..2426414 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -20,9 +20,11 @@
 package org.apache.iotdb.db.writelog.recover;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.modification.Deletion;
@@ -85,9 +87,10 @@ public class LogReplayer {
    * finds the logNode of the TsFile given by insertFilePath and logNodePrefix, reads the WALs from
    * the logNode and redoes them into a given MemTable and ModificationFile.
    */
-  public void replayLogs() {
-    WriteLogNode logNode = MultiFileLogNodeManager.getInstance().getNode(
-        logNodePrefix + FSFactoryProducer.getFSFactory().getFile(insertFilePath).getName());
+  public void replayLogs(Supplier<ByteBuffer[]> supplier) {
+    WriteLogNode logNode = MultiFileLogNodeManager.getInstance()
+        .getNode(logNodePrefix + FSFactoryProducer.getFSFactory().getFile(insertFilePath).getName(),
+            supplier);
 
     ILogReader logReader = logNode.getLogReader();
     try {
@@ -171,7 +174,8 @@ public class LogReplayer {
     if (plan instanceof InsertRowPlan) {
       recoverMemTable.insert((InsertRowPlan) plan);
     } else {
-      recoverMemTable.insertTablet((InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount());
+      recoverMemTable
+          .insertTablet((InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 8120d00..3418847 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -23,10 +23,13 @@ import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SU
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -81,8 +84,8 @@ public class TsFileRecoverPerformer {
    * writing
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  public RestorableTsFileIOWriter recover(boolean needRedoWal)
-      throws StorageGroupProcessorException {
+  public RestorableTsFileIOWriter recover(boolean needRedoWal, Supplier<ByteBuffer[]> supplier,
+      Consumer<ByteBuffer[]> consumer) throws StorageGroupProcessorException {
 
     File file = FSFactoryProducer.getFSFactory().getFile(filePath);
     if (!file.exists()) {
@@ -120,12 +123,13 @@ public class TsFileRecoverPerformer {
 
     // redo logs
     if (needRedoWal) {
-      redoLogs(restorableTsFileIOWriter);
+      redoLogs(restorableTsFileIOWriter, supplier);
 
       // clean logs
       try {
         MultiFileLogNodeManager.getInstance()
-            .deleteNode(logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
+            .deleteNode(logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName(),
+                consumer);
       } catch (IOException e) {
         throw new StorageGroupProcessorException(e);
       }
@@ -196,13 +200,13 @@ public class TsFileRecoverPerformer {
     tsFileResource.updatePlanIndexes(restorableTsFileIOWriter.getMaxPlanIndex());
   }
 
-  private void redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter)
+  private void redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter, Supplier<ByteBuffer[]> supplier)
       throws StorageGroupProcessorException {
     IMemTable recoverMemTable = new PrimitiveMemTable();
     recoverMemTable.setVersion(versionController.nextVersion());
     LogReplayer logReplayer = new LogReplayer(logNodePrefix, filePath, tsFileResource.getModFile(),
         versionController, tsFileResource, recoverMemTable, sequence);
-    logReplayer.replayLogs();
+    logReplayer.replayLogs(supplier);
     try {
       if (!recoverMemTable.isEmpty()) {
         // flush logs
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/IoTDBLogFileSizeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/IoTDBLogFileSizeTest.java
index 2a7a5ed..8386977 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/IoTDBLogFileSizeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/IoTDBLogFileSizeTest.java
@@ -20,12 +20,15 @@
 package org.apache.iotdb.db.writelog;
 
 import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.Statement;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.MmapUtil;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
@@ -57,7 +60,7 @@ public class IoTDBLogFileSizeTest {
       return;
     }
     groupSize = TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte();
-    TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte( 8 * 1024 * 1024);
+    TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(8 * 1024 * 1024);
     IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(8 * 1024 * 1024);
     EnvironmentUtils.closeStatMonitor();
     EnvironmentUtils.envSetUp();
@@ -81,6 +84,11 @@ public class IoTDBLogFileSizeTest {
       return;
     }
     final long[] maxLength = {0};
+    ByteBuffer[] buffers = new ByteBuffer[2];
+    buffers[0] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    buffers[1] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
     Thread writeThread = new Thread(() -> {
       int cnt = 0;
       try {
@@ -100,7 +108,7 @@ public class IoTDBLogFileSizeTest {
               cnt);
           statement.execute(sql);
           WriteLogNode logNode = MultiFileLogNodeManager.getInstance().getNode(
-              "root.logFileTest.seq" + IoTDBConstant.SEQFILE_LOG_NODE_SUFFIX);
+              "root.logFileTest.seq" + IoTDBConstant.SEQFILE_LOG_NODE_SUFFIX, () -> buffers);
           File bufferWriteWALFile = new File(
               logNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME);
           if (bufferWriteWALFile.exists() && bufferWriteWALFile.length() > maxLength[0]) {
@@ -117,6 +125,9 @@ public class IoTDBLogFileSizeTest {
     while (writeThread.isAlive()) {
 
     }
+    for (ByteBuffer byteBuffer : buffers) {
+      MmapUtil.clean((MappedByteBuffer) byteBuffer);
+    }
   }
 
   @Test
@@ -124,6 +135,11 @@ public class IoTDBLogFileSizeTest {
     if (skip) {
       return;
     }
+    ByteBuffer[] buffers = new ByteBuffer[2];
+    buffers[0] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    buffers[1] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
     final long[] maxLength = {0};
     Thread writeThread = new Thread(() -> {
       int cnt = 0;
@@ -143,7 +159,7 @@ public class IoTDBLogFileSizeTest {
                   ++cnt, cnt);
           statement.execute(sql);
           WriteLogNode logNode = MultiFileLogNodeManager.getInstance()
-              .getNode("root.logFileTest.unsequence" + IoTDBConstant.UNSEQFILE_LOG_NODE_SUFFIX);
+              .getNode("root.logFileTest.unsequence" + IoTDBConstant.UNSEQFILE_LOG_NODE_SUFFIX, () -> buffers);
           File WALFile = new File(
               logNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME);
           if (WALFile.exists() && WALFile.length() > maxLength[0]) {
@@ -160,6 +176,9 @@ public class IoTDBLogFileSizeTest {
     while (writeThread.isAlive()) {
 
     }
+    for (ByteBuffer byteBuffer : buffers) {
+      MmapUtil.clean((MappedByteBuffer) byteBuffer);
+    }
   }
 
   private void executeSQL(String[] sqls) throws ClassNotFoundException {
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
index 1d6e918..cc62db0 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
@@ -20,6 +20,8 @@ package org.apache.iotdb.db.writelog;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.util.Collections;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -31,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.MmapUtil;
 import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -82,13 +85,20 @@ public class PerformanceTest {
         tempRestore.createNewFile();
         tempProcessorStore.createNewFile();
 
-        WriteLogNode logNode = new ExclusiveWriteLogNode("root.testLogNode");
+        ByteBuffer[] byteBuffers = new ByteBuffer[2];
+        byteBuffers[0] = ByteBuffer
+            .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+        byteBuffers[1] = ByteBuffer
+            .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+
+        WriteLogNode logNode = new ExclusiveWriteLogNode("root.testLogNode", byteBuffers);
 
         long time = System.currentTimeMillis();
         for (int i = 0; i < 1000000; i++) {
           InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("logTestDevice"), 100,
               new String[]{"s1", "s2", "s3", "s4"},
-              new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
+              new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT,
+                  TSDataType.BOOLEAN},
               new String[]{"1.0", "15", "str", "false"});
           DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50,
               new PartialPath("root.logTestDevice.s1"));
@@ -102,7 +112,10 @@ public class PerformanceTest {
             3000000 + " logs use " + (System.currentTimeMillis() - time) + " ms at batch size "
                 + config.getFlushWalThreshold());
 
-        logNode.delete();
+        ByteBuffer[] array = logNode.delete();
+        for (ByteBuffer byteBuffer : array) {
+          MmapUtil.clean((MappedByteBuffer) byteBuffer);
+        }
         tempRestore.delete();
         tempProcessorStore.delete();
         tempRestore.getParentFile().delete();
@@ -130,25 +143,37 @@ public class PerformanceTest {
     } catch (MetadataException ignored) {
     }
     IoTDB.metaManager
-        .createTimeseries(new PartialPath("root.logTestDevice.s1"), TSDataType.DOUBLE, TSEncoding.PLAIN,
+        .createTimeseries(new PartialPath("root.logTestDevice.s1"), TSDataType.DOUBLE,
+            TSEncoding.PLAIN,
             TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap());
     IoTDB.metaManager
-        .createTimeseries(new PartialPath("root.logTestDevice.s2"), TSDataType.INT32, TSEncoding.PLAIN,
+        .createTimeseries(new PartialPath("root.logTestDevice.s2"), TSDataType.INT32,
+            TSEncoding.PLAIN,
             TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap());
     IoTDB.metaManager
-        .createTimeseries(new PartialPath("root.logTestDevice.s3"), TSDataType.TEXT, TSEncoding.PLAIN,
+        .createTimeseries(new PartialPath("root.logTestDevice.s3"), TSDataType.TEXT,
+            TSEncoding.PLAIN,
             TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap());
     IoTDB.metaManager
-        .createTimeseries(new PartialPath("root.logTestDevice.s4"), TSDataType.BOOLEAN, TSEncoding.PLAIN,
+        .createTimeseries(new PartialPath("root.logTestDevice.s4"), TSDataType.BOOLEAN,
+            TSEncoding.PLAIN,
             TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap());
-    WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice");
+
+    ByteBuffer[] byteBuffers = new ByteBuffer[2];
+    byteBuffers[0] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    byteBuffers[1] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", byteBuffers);
 
     for (int i = 0; i < 1000000; i++) {
       InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("root.logTestDevice"), 100,
           new String[]{"s1", "s2", "s3", "s4"},
-          new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
+          new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT,
+              TSDataType.BOOLEAN},
           new String[]{"1.0", "15", "str", "false"});
-      DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new PartialPath("root.logTestDevice.s1"));
+      DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50,
+          new PartialPath("root.logTestDevice.s1"));
 
       logNode.write(bwInsertPlan);
       logNode.write(deletePlan);
@@ -159,7 +184,10 @@ public class PerformanceTest {
       System.out.println(
           3000000 + " logs use " + (System.currentTimeMillis() - time) + "ms when recovering ");
     } finally {
-      logNode.delete();
+      ByteBuffer[] array = logNode.delete();
+      for (ByteBuffer byteBuffer : array) {
+        MmapUtil.clean((MappedByteBuffer) byteBuffer);
+      }
       tempRestore.delete();
       tempProcessorStore.delete();
       tempRestore.getParentFile().delete();
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
index 332ee6b..cdff9cf 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
@@ -19,12 +19,15 @@
 package org.apache.iotdb.db.writelog;
 
 import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertFalse;
 import static junit.framework.TestCase.assertNotSame;
 import static junit.framework.TestCase.assertSame;
 import static junit.framework.TestCase.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -32,11 +35,11 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.MmapUtil;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.manager.WriteLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -64,15 +67,45 @@ public class WriteLogNodeManagerTest {
   public void testGetAndDelete() throws IOException {
     String identifier = "testLogNode";
     WriteLogNodeManager manager = MultiFileLogNodeManager.getInstance();
-    WriteLogNode logNode = manager.getNode(identifier);
+    WriteLogNode logNode = manager.getNode(identifier, () -> {
+      ByteBuffer[] buffers = new ByteBuffer[2];
+      buffers[0] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      buffers[1] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      return buffers;
+    });
     assertEquals(identifier, logNode.getIdentifier());
 
-    WriteLogNode theSameNode = manager.getNode(identifier);
+    WriteLogNode theSameNode = manager.getNode(identifier, () -> {
+      ByteBuffer[] buffers = new ByteBuffer[2];
+      buffers[0] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      buffers[1] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      return buffers;
+    });
     assertSame(logNode, theSameNode);
 
-    manager.deleteNode(identifier);
-    WriteLogNode anotherNode = manager.getNode(identifier);
+    manager.deleteNode(identifier, (ByteBuffer[] array) -> {
+      for (ByteBuffer byteBuffer : array) {
+        MmapUtil.clean((MappedByteBuffer) byteBuffer);
+      }
+    });
+    WriteLogNode anotherNode = manager.getNode(identifier, () -> {
+      ByteBuffer[] buffers = new ByteBuffer[2];
+      buffers[0] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      buffers[1] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      return buffers;
+    });
     assertNotSame(logNode, anotherNode);
+    manager.deleteNode(identifier, (ByteBuffer[] array) -> {
+      for (ByteBuffer byteBuffer : array) {
+        MmapUtil.clean((MappedByteBuffer) byteBuffer);
+      }
+    });
   }
 
   @Test
@@ -84,17 +117,24 @@ public class WriteLogNodeManagerTest {
     File tempProcessorStore = File.createTempFile("managerTest", "processorStore");
 
     WriteLogNodeManager manager = MultiFileLogNodeManager.getInstance();
-    WriteLogNode logNode = manager
-        .getNode("root.managerTest");
+    WriteLogNode logNode = manager.getNode("root.managerTest", () -> {
+      ByteBuffer[] buffers = new ByteBuffer[2];
+      buffers[0] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      buffers[1] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      return buffers;
+    });
 
     InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("logTestDevice"), 100,
         new String[]{"s1", "s2", "s3", "s4"},
         new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
         new String[]{"1.0", "15", "str", "false"});
-    DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new PartialPath("root.logTestDevice.s1"));
+    DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50,
+        new PartialPath("root.logTestDevice.s1"));
 
     File walFile = new File(logNode.getLogDirectory() + File.separator + "wal1");
-    assertTrue(!walFile.exists());
+    assertFalse(walFile.exists());
 
     logNode.write(bwInsertPlan);
     logNode.write(deletePlan);
@@ -102,7 +142,10 @@ public class WriteLogNodeManagerTest {
     Thread.sleep(config.getForceWalPeriodInMs() + 1000);
     assertTrue(walFile.exists());
 
-    logNode.delete();
+    ByteBuffer[] buffers = logNode.delete();
+    for (ByteBuffer byteBuffer : buffers) {
+      MmapUtil.clean((MappedByteBuffer) byteBuffer);
+    }
     config.setForceWalPeriodInMs(flushWalPeriod);
     tempRestore.delete();
     tempProcessorStore.delete();
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index fe5610c..eadeb38 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -24,6 +24,8 @@ import static junit.framework.TestCase.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -34,6 +36,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.MmapUtil;
 import org.apache.iotdb.db.writelog.io.ILogReader;
 import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
@@ -68,7 +71,12 @@ public class WriteLogNodeTest {
     // then reads the logs from file
     String identifier = "root.logTestDevice";
 
-    WriteLogNode logNode = new ExclusiveWriteLogNode(identifier);
+    ByteBuffer[] byteBuffers = new ByteBuffer[2];
+    byteBuffers[0] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    byteBuffers[1] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    WriteLogNode logNode = new ExclusiveWriteLogNode(identifier, byteBuffers);
 
     InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath(identifier), 100,
         new String[]{"s1", "s2", "s3", "s4"},
@@ -96,7 +104,7 @@ public class WriteLogNodeTest {
     }
 
     InsertTabletPlan tabletPlan = new InsertTabletPlan(new PartialPath(identifier),
-      new String[]{"s1", "s2", "s3", "s4"}, dataTypes);
+        new String[]{"s1", "s2", "s3", "s4"}, dataTypes);
     tabletPlan.setTimes(times);
     tabletPlan.setColumns(columns);
     tabletPlan.setRowCount(times.length);
@@ -122,7 +130,10 @@ public class WriteLogNodeTest {
     assertEquals(newPlan.getMeasurements().length, 3);
     reader.close();
 
-    logNode.delete();
+    ByteBuffer[] array = logNode.delete();
+    for (ByteBuffer byteBuffer : array) {
+      MmapUtil.clean((MappedByteBuffer) byteBuffer);
+    }
   }
 
   @Test
@@ -131,7 +142,12 @@ public class WriteLogNodeTest {
     // then calls notifyStartFlush() and notifyEndFlush() to delete old file
     String identifier = "root.logTestDevice";
 
-    WriteLogNode logNode = new ExclusiveWriteLogNode(identifier);
+    ByteBuffer[] byteBuffers = new ByteBuffer[2];
+    byteBuffers[0] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    byteBuffers[1] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    WriteLogNode logNode = new ExclusiveWriteLogNode(identifier, byteBuffers);
 
     InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath(identifier), 100,
         new String[]{"s1", "s2", "s3", "s4"},
@@ -159,7 +175,10 @@ public class WriteLogNodeTest {
     assertFalse(logReader.hasNext());
     logReader.close();
 
-    logNode.delete();
+    ByteBuffer[] array = logNode.delete();
+    for (ByteBuffer byteBuffer : array) {
+      MmapUtil.clean((MappedByteBuffer) byteBuffer);
+    }
   }
 
   @Test
@@ -168,13 +187,19 @@ public class WriteLogNodeTest {
     int flushWalThreshold = config.getFlushWalThreshold();
     config.setFlushWalThreshold(2);
 
-    WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice");
+    ByteBuffer[] byteBuffers = new ByteBuffer[2];
+    byteBuffers[0] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    byteBuffers[1] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", byteBuffers);
 
     InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("root.logTestDevice"), 100,
         new String[]{"s1", "s2", "s3", "s4"},
         new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
         new String[]{"1.0", "15", "str", "false"});
-    DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new PartialPath("root.logTestDevice.s1"));
+    DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50,
+        new PartialPath("root.logTestDevice.s1"));
 
     logNode.write(bwInsertPlan);
 
@@ -189,7 +214,10 @@ public class WriteLogNodeTest {
     }
     assertTrue(walFile.exists());
 
-    logNode.delete();
+    ByteBuffer[] array = logNode.delete();
+    for (ByteBuffer byteBuffer : array) {
+      MmapUtil.clean((MappedByteBuffer) byteBuffer);
+    }
     config.setFlushWalThreshold(flushWalThreshold);
   }
 
@@ -198,13 +226,19 @@ public class WriteLogNodeTest {
     // this test uses a dummy insert log node to insert a few logs and flushes them
     // then deletes the node
 
-    WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice");
+    ByteBuffer[] byteBuffers = new ByteBuffer[2];
+    byteBuffers[0] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    byteBuffers[1] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", byteBuffers);
 
     InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("logTestDevice"), 100,
         new String[]{"s1", "s2", "s3", "s4"},
         new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
         new String[]{"1.0", "15", "str", "false"});
-    DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new PartialPath("root.logTestDevice.s1"));
+    DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50,
+        new PartialPath("root.logTestDevice.s1"));
 
     logNode.write(bwInsertPlan);
     logNode.write(deletePlan);
@@ -219,16 +253,25 @@ public class WriteLogNodeTest {
     }
 
     assertTrue(new File(logNode.getLogDirectory()).exists());
-    logNode.delete();
-    assertTrue(!new File(logNode.getLogDirectory()).exists());
+    ByteBuffer[] array = logNode.delete();
+    for (ByteBuffer byteBuffer : array) {
+      MmapUtil.clean((MappedByteBuffer) byteBuffer);
+    }
+    assertFalse(new File(logNode.getLogDirectory()).exists());
   }
 
   @Test
   public void testOverSizedWAL() throws IOException, IllegalPathException {
     // this test uses a dummy insert log node to insert an over-sized log and assert exception caught
-    WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice.oversize");
-
-    InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("root.logTestDevice.oversize"), 100,
+    ByteBuffer[] byteBuffers = new ByteBuffer[2];
+    byteBuffers[0] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    byteBuffers[1] = ByteBuffer
+        .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice.oversize", byteBuffers);
+
+    InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("root.logTestDevice.oversize"),
+        100,
         new String[]{"s1", "s2", "s3", "s4"},
         new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
         new String[]{"1.0", "15", new String(new char[65 * 1024 * 1024]), "false"});
@@ -241,6 +284,9 @@ public class WriteLogNodeTest {
     }
     assertTrue(caught);
 
-    logNode.delete();
+    ByteBuffer[] array = logNode.delete();
+    for (ByteBuffer byteBuffer : array) {
+      MmapUtil.clean((MappedByteBuffer) byteBuffer);
+    }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index f7a5c63..a64c754 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -26,9 +26,12 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
@@ -50,6 +53,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.MmapUtil;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -109,7 +113,12 @@ public class LogReplayerTest {
           versionController, tsFileResource, memTable, false);
 
       WriteLogNode node =
-          MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsFile.getName());
+          MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsFile.getName(), () -> {
+            ByteBuffer[] byteBuffers = new ByteBuffer[2];
+            byteBuffers[0] = ByteBuffer.allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+            byteBuffers[1] = ByteBuffer.allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+            return byteBuffers;
+          });
       node.write(
           new InsertRowPlan(new PartialPath("root.sg.device0"), 100, "sensor0", TSDataType.INT64,
               String.valueOf(0)));
@@ -124,7 +133,12 @@ public class LogReplayerTest {
       node.write(deletePlan);
       node.close();
 
-      replayer.replayLogs();
+      replayer.replayLogs(() -> {
+        ByteBuffer[] byteBuffers = new ByteBuffer[2];
+        byteBuffers[0] = ByteBuffer.allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+        byteBuffers[1] = ByteBuffer.allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+        return byteBuffers;
+      });
 
       for (int i = 0; i < 5; i++) {
         ReadOnlyMemChunk memChunk = memTable
@@ -175,7 +189,11 @@ public class LogReplayerTest {
       }
     } finally {
       modFile.close();
-      MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + tsFile.getName());
+      MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + tsFile.getName(), (ByteBuffer[] byteBuffers) -> {
+        for (ByteBuffer byteBuffer : byteBuffers) {
+          MmapUtil.clean((MappedByteBuffer) byteBuffer);
+        }
+      });
       modF.delete();
       tsFile.delete();
       tsFile.getParentFile().delete();
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
index 94d4feb..3a0bbe6 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
@@ -21,12 +21,15 @@ package org.apache.iotdb.db.writelog.recover;
 
 import static org.junit.Assert.assertEquals;
 
-import java.util.*;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.Collections;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -38,6 +41,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.MmapUtil;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -87,7 +91,8 @@ public class RecoverResourceFromReaderTest {
     schema = new Schema();
     for (int i = 0; i < 10; i++) {
       for (int j = 0; j < 10; j++) {
-        PartialPath path = new PartialPath("root.sg.device" + i + IoTDBConstant.PATH_SEPARATOR + "sensor" + j);
+        PartialPath path = new PartialPath(
+            "root.sg.device" + i + IoTDBConstant.PATH_SEPARATOR + "sensor" + j);
         MeasurementSchema measurementSchema = new MeasurementSchema("sensor" + j, TSDataType.INT64,
             TSEncoding.PLAIN);
         schema.registerTimeseries(path.toTSFilePath(), measurementSchema);
@@ -99,17 +104,20 @@ public class RecoverResourceFromReaderTest {
     schema.registerTimeseries(new Path(("root.sg.device99"), ("sensor4")),
         new MeasurementSchema("sensor4", TSDataType.INT64, TSEncoding.PLAIN));
     IoTDB.metaManager
-        .createTimeseries(new PartialPath("root.sg.device99.sensor4"), TSDataType.INT64, TSEncoding.PLAIN,
+        .createTimeseries(new PartialPath("root.sg.device99.sensor4"), TSDataType.INT64,
+            TSEncoding.PLAIN,
             TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap());
     schema.registerTimeseries(new Path(("root.sg.device99"), ("sensor2")),
         new MeasurementSchema("sensor2", TSDataType.INT64, TSEncoding.PLAIN));
     IoTDB.metaManager
-        .createTimeseries(new PartialPath("root.sg.device99.sensor2"), TSDataType.INT64, TSEncoding.PLAIN,
+        .createTimeseries(new PartialPath("root.sg.device99.sensor2"), TSDataType.INT64,
+            TSEncoding.PLAIN,
             TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap());
     schema.registerTimeseries(new Path(("root.sg.device99"), ("sensor1")),
         new MeasurementSchema("sensor1", TSDataType.INT64, TSEncoding.PLAIN));
     IoTDB.metaManager
-        .createTimeseries(new PartialPath("root.sg.device99.sensor1"), TSDataType.INT64, TSEncoding.PLAIN,
+        .createTimeseries(new PartialPath("root.sg.device99.sensor1"), TSDataType.INT64,
+            TSEncoding.PLAIN,
             TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap());
     writer = new TsFileWriter(tsF, schema);
 
@@ -134,7 +142,14 @@ public class RecoverResourceFromReaderTest {
     writer.flushAllChunkGroups();
     writer.getIOWriter().close();
 
-    node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName());
+    node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName(), () -> {
+      ByteBuffer[] byteBuffers = new ByteBuffer[2];
+      byteBuffers[0] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      byteBuffers[1] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      return byteBuffers;
+    });
     for (int i = 0; i < 10; i++) {
       for (int j = 0; j < 10; j++) {
         String[] measurements = new String[10];
@@ -145,16 +160,19 @@ public class RecoverResourceFromReaderTest {
           types[k] = TSDataType.INT64;
           values[k] = String.valueOf(k + 10);
         }
-        InsertRowPlan insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device" + j), i, measurements,
+        InsertRowPlan insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device" + j), i,
+            measurements,
             types, values);
         node.write(insertRowPlan);
       }
       node.notifyStartFlush();
     }
-    InsertRowPlan insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device99"), 1, new String[]{"sensor4"},
+    InsertRowPlan insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device99"), 1,
+        new String[]{"sensor4"},
         new TSDataType[]{TSDataType.INT64}, new String[]{"4"});
     node.write(insertRowPlan);
-    insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device99"), 300, new String[]{"sensor2"},
+    insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device99"), 300,
+        new String[]{"sensor2"},
         new TSDataType[]{TSDataType.INT64}, new String[]{"2"});
     node.write(insertRowPlan);
     node.close();
@@ -167,7 +185,10 @@ public class RecoverResourceFromReaderTest {
     EnvironmentUtils.cleanEnv();
     FileUtils.deleteDirectory(tsF.getParentFile());
     resource.close();
-    node.delete();
+    ByteBuffer[] array = node.delete();
+    for (ByteBuffer byteBuffer : array) {
+      MmapUtil.clean((MappedByteBuffer) byteBuffer);
+    }
   }
 
   @Test
@@ -183,7 +204,18 @@ public class RecoverResourceFromReaderTest {
 
     TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, versionController,
         resource, false, false);
-    performer.recover(true).close();
+    performer.recover(true, () -> {
+      ByteBuffer[] byteBuffers = new ByteBuffer[2];
+      byteBuffers[0] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      byteBuffers[1] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      return byteBuffers;
+    }, (ByteBuffer[] byteBuffers) -> {
+      for (ByteBuffer byteBuffer : byteBuffers) {
+        MmapUtil.clean((MappedByteBuffer) byteBuffer);
+      }
+    }).close();
     assertEquals(1, resource.getStartTime("root.sg.device99"));
     assertEquals(300, resource.getEndTime("root.sg.device99"));
     for (int i = 0; i < 10; i++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index f4ff7cf..5c3ea2d 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -21,16 +21,18 @@ package org.apache.iotdb.db.writelog.recover;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -42,6 +44,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.MmapUtil;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -99,7 +102,8 @@ public class SeqTsFileRecoverTest {
     for (int i = 0; i < 10; i++) {
       for (int j = 0; j < 10; j++) {
         IoTDB.metaManager
-            .createTimeseries(new PartialPath("root.sg.device" + i + ".sensor" + j), TSDataType.INT64,
+            .createTimeseries(new PartialPath("root.sg.device" + i + ".sensor" + j),
+                TSDataType.INT64,
                 TSEncoding.PLAIN, TSFileDescriptor.getInstance().getConfig().getCompressor(),
                 Collections.emptyMap());
       }
@@ -138,7 +142,14 @@ public class SeqTsFileRecoverTest {
     writer.flushAllChunkGroups();
     writer.getIOWriter().close();
 
-    node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName());
+    node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName(), () -> {
+      ByteBuffer[] buffers = new ByteBuffer[2];
+      buffers[0] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      buffers[1] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      return buffers;
+    });
     for (int i = 10; i < 20; i++) {
       for (int j = 0; j < 10; j++) {
         String[] measurements = new String[10];
@@ -149,7 +160,8 @@ public class SeqTsFileRecoverTest {
           types[k] = TSDataType.INT64;
           values[k] = String.valueOf(k);
         }
-        InsertRowPlan insertPlan = new InsertRowPlan(new PartialPath("root.sg.device" + j), i, measurements, types,
+        InsertRowPlan insertPlan = new InsertRowPlan(new PartialPath("root.sg.device" + j), i,
+            measurements, types,
             values);
         node.write(insertPlan);
       }
@@ -164,14 +176,28 @@ public class SeqTsFileRecoverTest {
     EnvironmentUtils.cleanEnv();
     FileUtils.deleteDirectory(tsF.getParentFile());
     resource.close();
-    node.delete();
+    ByteBuffer[] buffers = node.delete();
+    for (ByteBuffer byteBuffer : buffers) {
+      MmapUtil.clean((MappedByteBuffer) byteBuffer);
+    }
   }
 
   @Test
   public void testNonLastRecovery() throws StorageGroupProcessorException, IOException {
     TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, versionController,
         resource, false, false);
-    RestorableTsFileIOWriter writer = performer.recover(true);
+    RestorableTsFileIOWriter writer = performer.recover(true, () -> {
+      ByteBuffer[] buffers = new ByteBuffer[2];
+      buffers[0] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      buffers[1] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      return buffers;
+    }, (ByteBuffer[] array) -> {
+      for (ByteBuffer byteBuffer : array) {
+        MmapUtil.clean((MappedByteBuffer) byteBuffer);
+      }
+    });
     assertFalse(writer.canWrite());
     writer.close();
 
@@ -220,7 +246,18 @@ public class SeqTsFileRecoverTest {
   public void testLastRecovery() throws StorageGroupProcessorException, IOException {
     TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, versionController,
         resource, false, true);
-    RestorableTsFileIOWriter writer = performer.recover(true);
+    RestorableTsFileIOWriter writer = performer.recover(true, () -> {
+      ByteBuffer[] buffers = new ByteBuffer[2];
+      buffers[0] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      buffers[1] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      return buffers;
+    }, (ByteBuffer[] array) -> {
+      for (ByteBuffer byteBuffer : array) {
+        MmapUtil.clean((MappedByteBuffer) byteBuffer);
+      }
+    });
 
     writer.makeMetadataVisible();
     assertEquals(11, writer.getMetadatasForQuery().size());
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index 8caa93a..2d4562f 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -23,9 +23,12 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.util.Collections;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -39,6 +42,7 @@ import org.apache.iotdb.db.query.reader.chunk.ChunkDataIterator;
 import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.MmapUtil;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -142,7 +146,14 @@ public class UnseqTsFileRecoverTest {
     writer.flushAllChunkGroups();
     writer.getIOWriter().close();
 
-    node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName());
+    node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName(), () -> {
+      ByteBuffer[] buffers = new ByteBuffer[2];
+      buffers[0] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      buffers[1] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      return buffers;
+    });
     for (int i = 0; i < 10; i++) {
       for (int j = 0; j < 10; j++) {
         String[] measurements = new String[10];
@@ -172,7 +183,10 @@ public class UnseqTsFileRecoverTest {
   public void tearDown() throws IOException, StorageEngineException {
     FileUtils.deleteDirectory(tsF.getParentFile());
     resource.close();
-    node.delete();
+    ByteBuffer[] array = node.delete();
+    for (ByteBuffer byteBuffer : array) {
+      MmapUtil.clean((MappedByteBuffer) byteBuffer);
+    }
     EnvironmentUtils.cleanEnv();
   }
 
@@ -180,7 +194,18 @@ public class UnseqTsFileRecoverTest {
   public void test() throws StorageGroupProcessorException, IOException {
     TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix,
         versionController, resource, false, false);
-    performer.recover(true).close();
+    performer.recover(true, () -> {
+      ByteBuffer[] buffers = new ByteBuffer[2];
+      buffers[0] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      buffers[1] = ByteBuffer
+          .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+      return buffers;
+    }, (ByteBuffer[] array) -> {
+      for (ByteBuffer byteBuffer : array) {
+        MmapUtil.clean((MappedByteBuffer) byteBuffer);
+      }
+    }).close();
 
     assertEquals(1, resource.getStartTime("root.sg.device99"));
     assertEquals(300, resource.getEndTime("root.sg.device99"));