You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/26 01:16:54 UTC

[iotdb] branch master updated: [IOTDB-5925] Pipe: WAL Resource Management (#9948)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 94212b64c74 [IOTDB-5925] Pipe: WAL Resource Management (#9948)
94212b64c74 is described below

commit 94212b64c74d94e268ed71ad7ac13e60cfcc36d1
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Fri May 26 09:16:48 2023 +0800

    [IOTDB-5925] Pipe: WAL Resource Management (#9948)
---
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 .../db/engine/storagegroup/TsFileProcessor.java    |  33 +++--
 .../listener/PipeInsertionDataNodeListener.java    |  10 +-
 .../impl/iotdb/v1/IoTDBThriftConnectorV1.java      |   3 +-
 .../core/event/impl/PipeTabletInsertionEvent.java  |  53 ++++---
 .../realtime/PipeRealtimeCollectEventFactory.java  |   5 +-
 .../db/pipe/resource/PipeResourceManager.java      |   9 ++
 .../{ => file}/PipeFileResourceManager.java        |   2 +-
 .../db/pipe/resource/wal/PipeWALResource.java      | 162 +++++++++++++++++++++
 .../pipe/resource/wal/PipeWALResourceManager.java  | 111 ++++++++++++++
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |   9 +-
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  |   8 +-
 .../java/org/apache/iotdb/db/wal/node/WALNode.java |   4 +-
 .../{WALPipeHandler.java => WALEntryHandler.java}  |  22 ++-
 .../db/wal/utils/listener/WALFlushListener.java    |  10 +-
 .../core/collector/PipeRealtimeCollectTest.java    |   6 +
 .../pipe/resource/PipeFileResourceManagerTest.java |   1 +
 ...peHandlerTest.java => WALEntryHandlerTest.java} |  18 +--
 .../iotdb/db/wal/utils/WALInsertNodeCacheTest.java |   8 +-
 19 files changed, 409 insertions(+), 66 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index d156287e1f3..718c5a6bfc1 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -72,6 +72,7 @@ public enum ThreadName {
   PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
   PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
   PIPE_META_SYNC_SERVICE("Pipe-Meta-Sync-Service"),
+  PIPE_WAL_RESOURCE_TTL_CHECKER_SERVICE("Pipe-WAL-Resource-TTL-Checker-Service"),
   ;
 
   private final String name;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 55f06629258..f89f28f9b2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -258,8 +258,9 @@ public class TsFileProcessor {
     }
 
     long startTime = System.nanoTime();
+    WALFlushListener walFlushListener;
     try {
-      WALFlushListener walFlushListener = walNode.log(workMemTable.getMemTableId(), insertRowNode);
+      walFlushListener = walNode.log(workMemTable.getMemTableId(), insertRowNode);
       if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
         throw walFlushListener.getCause();
       }
@@ -277,17 +278,20 @@ public class TsFileProcessor {
     }
 
     startTime = System.nanoTime();
+
+    PipeInsertionDataNodeListener.getInstance()
+        .listenToInsertNode(
+            dataRegionInfo.getDataRegion().getDataRegionId(),
+            walFlushListener.getWalEntryHandler(),
+            insertRowNode,
+            tsFileResource);
+
     if (insertRowNode.isAligned()) {
       workMemTable.insertAlignedRow(insertRowNode);
     } else {
       workMemTable.insert(insertRowNode);
     }
 
-    // collect plan node in pipe
-    PipeInsertionDataNodeListener.getInstance()
-        .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode, tsFileResource);
-
     // update start time of this memtable
     tsFileResource.updateStartTime(
         insertRowNode.getDeviceID().toStringID(), insertRowNode.getTime());
@@ -358,9 +362,9 @@ public class TsFileProcessor {
     }
 
     long startTime = System.nanoTime();
+    WALFlushListener walFlushListener;
     try {
-      WALFlushListener walFlushListener =
-          walNode.log(workMemTable.getMemTableId(), insertTabletNode, start, end);
+      walFlushListener = walNode.log(workMemTable.getMemTableId(), insertTabletNode, start, end);
       if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
         throw walFlushListener.getCause();
       }
@@ -377,6 +381,14 @@ public class TsFileProcessor {
     }
 
     startTime = System.nanoTime();
+
+    PipeInsertionDataNodeListener.getInstance()
+        .listenToInsertNode(
+            dataRegionInfo.getDataRegion().getDataRegionId(),
+            walFlushListener.getWalEntryHandler(),
+            insertTabletNode,
+            tsFileResource);
+
     try {
       if (insertTabletNode.isAligned()) {
         workMemTable.insertAlignedTablet(insertTabletNode, start, end);
@@ -393,11 +405,6 @@ public class TsFileProcessor {
       results[i] = RpcUtils.SUCCESS_STATUS;
     }
 
-    // collect plan node in pipe
-    PipeInsertionDataNodeListener.getInstance()
-        .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionId(), insertTabletNode, tsFileResource);
-
     tsFileResource.updateStartTime(
         insertTabletNode.getDeviceID().toStringID(), insertTabletNode.getTimes()[start]);
     // for sequence tsfile, we update the endTime only when the file is prepared to be closed.
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
index 778a583f1c7..d39a2fb9dc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.assigner.PipeDataRegionAssigner;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEventFactory;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -84,9 +85,11 @@ public class PipeInsertionDataNodeListener {
     assigner.publishToAssign(PipeRealtimeCollectEventFactory.createCollectEvent(tsFileResource));
   }
 
-  // TODO: check whether the method is called on the right place.
   public void listenToInsertNode(
-      String dataRegionId, InsertNode insertNode, TsFileResource tsFileResource) {
+      String dataRegionId,
+      WALEntryHandler walEntryHandler,
+      InsertNode insertNode,
+      TsFileResource tsFileResource) {
     final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
 
     // only events from registered data region will be collected
@@ -95,7 +98,8 @@ public class PipeInsertionDataNodeListener {
     }
 
     assigner.publishToAssign(
-        PipeRealtimeCollectEventFactory.createCollectEvent(insertNode, tsFileResource));
+        PipeRealtimeCollectEventFactory.createCollectEvent(
+            walEntryHandler, insertNode, tsFileResource));
   }
 
   /////////////////////////////// singleton ///////////////////////////////
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index 6b414c03b36..65837f31d42 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe
 import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -130,7 +131,7 @@ public class IoTDBThriftConnectorV1 implements PipeConnector {
   }
 
   private void doTransfer(PipeTabletInsertionEvent pipeTabletInsertionEvent)
-      throws PipeException, TException {
+      throws PipeException, TException, WALPipeException {
     final TPipeTransferResp resp =
         client.pipeTransfer(
             PipeTransferInsertNodeReq.toTPipeTransferReq(pipeTabletInsertionEvent.getInsertNode()));
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index 62c599b66c1..99d9da055ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -21,28 +21,32 @@ package org.apache.iotdb.db.pipe.core.event.impl;
 
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.wal.exception.WALPipeException;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 
 public class PipeTabletInsertionEvent implements TabletInsertionEvent, EnrichedEvent {
 
-  private final InsertNode insertNode;
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletInsertionEvent.class);
 
-  private final AtomicInteger referenceCount;
+  private final WALEntryHandler walEntryHandler;
 
-  public PipeTabletInsertionEvent(InsertNode insertNode) {
-    this.insertNode = insertNode;
-    this.referenceCount = new AtomicInteger(0);
+  public PipeTabletInsertionEvent(WALEntryHandler walEntryHandler) {
+    this.walEntryHandler = walEntryHandler;
   }
 
-  public InsertNode getInsertNode() {
-    return insertNode;
+  public InsertNode getInsertNode() throws WALPipeException {
+    return walEntryHandler.getValue();
   }
 
   @Override
@@ -62,26 +66,41 @@ public class PipeTabletInsertionEvent implements TabletInsertionEvent, EnrichedE
 
   @Override
   public boolean increaseReferenceCount(String holderMessage) {
-    // TODO: use WALPipeHandler pinMemtable
-    referenceCount.incrementAndGet();
-    return true;
+    try {
+      PipeResourceManager.wal().pin(walEntryHandler.getMemTableId(), walEntryHandler);
+      return true;
+    } catch (Exception e) {
+      LOGGER.warn(
+          String.format(
+              "Increase reference count for memtable %d error. Holder Message: %s",
+              walEntryHandler.getMemTableId(), holderMessage),
+          e);
+      return false;
+    }
   }
 
   @Override
   public boolean decreaseReferenceCount(String holderMessage) {
-    // TODO: use WALPipeHandler unpinMemetable
-    referenceCount.decrementAndGet();
-    return true;
+    try {
+      PipeResourceManager.wal().unpin(walEntryHandler.getMemTableId());
+      return true;
+    } catch (Exception e) {
+      LOGGER.warn(
+          String.format(
+              "Decrease reference count for memtable %d error. Holder Message: %s",
+              walEntryHandler.getMemTableId(), holderMessage),
+          e);
+      return false;
+    }
   }
 
   @Override
   public int getReferenceCount() {
-    // TODO: use WALPipeHandler unpinMemetable
-    return referenceCount.get();
+    return PipeResourceManager.wal().getReferenceCount(walEntryHandler.getMemTableId());
   }
 
   @Override
   public String toString() {
-    return "PipeTabletInsertionEvent{" + "insertNode=" + insertNode + '}';
+    return "PipeTabletInsertionEvent{" + "walEntryHandler=" + walEntryHandler + '}';
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
index 4c98c5193be..a4b453e4322 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 
 public class PipeRealtimeCollectEventFactory {
 
@@ -34,9 +35,9 @@ public class PipeRealtimeCollectEventFactory {
   }
 
   public static PipeRealtimeCollectEvent createCollectEvent(
-      InsertNode node, TsFileResource resource) {
+      WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource resource) {
     return TS_FILE_EPOCH_MANAGER.bindPipeTabletInsertionEvent(
-        new PipeTabletInsertionEvent(node), node, resource);
+        new PipeTabletInsertionEvent(walEntryHandler), insertNode, resource);
   }
 
   private PipeRealtimeCollectEventFactory() {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
index 61b4e61a04e..43bddd872f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
@@ -19,18 +19,27 @@
 
 package org.apache.iotdb.db.pipe.resource;
 
+import org.apache.iotdb.db.pipe.resource.file.PipeFileResourceManager;
+import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
+
 public class PipeResourceManager {
 
   private final PipeFileResourceManager pipeFileResourceManager;
+  private final PipeWALResourceManager pipeWALResourceManager;
 
   public static PipeFileResourceManager file() {
     return PipeResourceManagerHolder.INSTANCE.pipeFileResourceManager;
   }
 
+  public static PipeWALResourceManager wal() {
+    return PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager;
+  }
+
   ///////////////////////////// SINGLETON /////////////////////////////
 
   private PipeResourceManager() {
     pipeFileResourceManager = new PipeFileResourceManager();
+    pipeWALResourceManager = new PipeWALResourceManager();
   }
 
   private static class PipeResourceManagerHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
similarity index 99%
rename from server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
index e7d961b3c9f..942ab600536 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.resource;
+package org.apache.iotdb.db.pipe.resource.file;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.utils.FileUtils;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
new file mode 100644
index 00000000000..9ccb946ae45
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.wal;
+
+import org.apache.iotdb.db.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PipeWALResource implements AutoCloseable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeWALResource.class);
+
+  private final WALEntryHandler walEntryHandler;
+
+  private final AtomicInteger referenceCount;
+
+  // TODO: make this configurable
+  public static final long MIN_TIME_TO_LIVE_IN_MS = 1000 * 60;
+  private final AtomicLong lastLogicalPinTime;
+  private final AtomicBoolean isPhysicallyPinned;
+
+  public PipeWALResource(WALEntryHandler walEntryHandler) {
+    this.walEntryHandler = walEntryHandler;
+
+    referenceCount = new AtomicInteger(0);
+
+    lastLogicalPinTime = new AtomicLong(0);
+    isPhysicallyPinned = new AtomicBoolean(false);
+  }
+
+  public void pin() throws PipeRuntimeNonCriticalException {
+    if (referenceCount.get() == 0) {
+      if (!isPhysicallyPinned.get()) {
+        try {
+          walEntryHandler.pinMemTable();
+        } catch (MemTablePinException e) {
+          throw new PipeRuntimeNonCriticalException(
+              String.format(
+                  "failed to pin wal %d, because %s",
+                  walEntryHandler.getMemTableId(), e.getMessage()));
+        }
+        isPhysicallyPinned.set(true);
+        LOGGER.info("wal {} is pinned by pipe engine", walEntryHandler.getMemTableId());
+      } // else means the wal is already pinned, do nothing
+
+      // no matter the wal is pinned or not, update the last pin time
+      lastLogicalPinTime.set(System.currentTimeMillis());
+    }
+
+    referenceCount.incrementAndGet();
+  }
+
+  public void unpin() throws PipeRuntimeNonCriticalException {
+    final int finalReferenceCount = referenceCount.get();
+
+    if (finalReferenceCount == 1) {
+      unpinPhysicallyIfOutOfTimeToLive();
+    } else if (finalReferenceCount < 1) {
+      throw new PipeRuntimeCriticalException(
+          String.format(
+              "wal %d is unpinned more than pinned, this should not happen",
+              walEntryHandler.getMemTableId()));
+    }
+
+    referenceCount.decrementAndGet();
+  }
+
+  /**
+   * Invalidate the wal if it is unpinned and out of time to live.
+   *
+   * @return true if the wal is invalidated, false otherwise
+   */
+  public boolean invalidateIfPossible() {
+    if (referenceCount.get() > 0) {
+      return false;
+    }
+
+    // referenceCount.get() == 0
+    return unpinPhysicallyIfOutOfTimeToLive();
+  }
+
+  /**
+   * Unpin the wal if it is out of time to live.
+   *
+   * @return true if the wal is unpinned physically (then it can be invalidated), false otherwise
+   */
+  private boolean unpinPhysicallyIfOutOfTimeToLive() {
+    if (isPhysicallyPinned.get()) {
+      if (System.currentTimeMillis() - lastLogicalPinTime.get() > MIN_TIME_TO_LIVE_IN_MS) {
+        try {
+          walEntryHandler.unpinMemTable();
+        } catch (MemTablePinException e) {
+          throw new PipeRuntimeNonCriticalException(
+              String.format(
+                  "failed to unpin wal %d, because %s",
+                  walEntryHandler.getMemTableId(), e.getMessage()));
+        }
+        isPhysicallyPinned.set(false);
+        LOGGER.info(
+            "wal {} is unpinned by pipe engine when checking time to live",
+            walEntryHandler.getMemTableId());
+        return true;
+      } else {
+        return false;
+      }
+    } else {
+      LOGGER.info(
+          "wal {} is not pinned physically when checking time to live",
+          walEntryHandler.getMemTableId());
+      return true;
+    }
+  }
+
+  @Override
+  public void close() {
+    if (isPhysicallyPinned.get()) {
+      try {
+        walEntryHandler.unpinMemTable();
+      } catch (MemTablePinException e) {
+        LOGGER.error(
+            "failed to unpin wal {} when closing pipe wal resource, because {}",
+            walEntryHandler.getMemTableId(),
+            e.getMessage());
+      }
+      isPhysicallyPinned.set(false);
+      LOGGER.info(
+          "wal {} is unpinned by pipe engine when closing pipe wal resource",
+          walEntryHandler.getMemTableId());
+    }
+
+    referenceCount.set(0);
+  }
+
+  public int getReferenceCount() {
+    return referenceCount.get();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
new file mode 100644
index 00000000000..18284de32f4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -0,0 +1,111 @@
+/*  * Licensed to the Apache Software Foundation (ASF) under one  * or more contributor license agreements.  See the NOTICE file  * distributed with this work for additional information  * regarding copyright ownership.  The ASF licenses this file  * to you under the Apache License, Version 2.0 (the  * "License"); you may not use this file except in compliance  * with the License.  You may obtain a copy of the License at  *  *     http://www.apache.org/licenses/LICENSE-2.0  *  * Unless r [...]
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class PipeWALResourceManager implements AutoCloseable {
+
+  private final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap;
+
+  private static final int SEGMENT_LOCK_COUNT = 32;
+  private final ReentrantLock[] memtableIdSegmentLocks;
+
+  private static final ScheduledExecutorService PIPE_WAL_RESOURCE_TTL_CHECKER =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER_SERVICE.getName());
+  private final ScheduledFuture<?> ttlCheckerFuture;
+
+  public PipeWALResourceManager() {
+    memtableIdToPipeWALResourceMap = new HashMap<>();
+
+    memtableIdSegmentLocks = new ReentrantLock[SEGMENT_LOCK_COUNT];
+    for (int i = 0; i < SEGMENT_LOCK_COUNT; i++) {
+      memtableIdSegmentLocks[i] = new ReentrantLock();
+    }
+
+    ttlCheckerFuture =
+        ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+            PIPE_WAL_RESOURCE_TTL_CHECKER,
+            () -> {
+              for (final long memtableId : memtableIdToPipeWALResourceMap.keySet()) {
+                final ReentrantLock lock =
+                    memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+                lock.lock();
+                try {
+                  if (memtableIdToPipeWALResourceMap.get(memtableId).invalidateIfPossible()) {
+                    memtableIdToPipeWALResourceMap.remove(memtableId);
+                  }
+                } finally {
+                  lock.unlock();
+                }
+              }
+            },
+            PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
+            PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
+            TimeUnit.MILLISECONDS);
+  }
+
+  public void pin(long memtableId, WALEntryHandler walEntryHandler) {
+    final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+    lock.lock();
+    try {
+      memtableIdToPipeWALResourceMap
+          .computeIfAbsent(memtableId, id -> new PipeWALResource(walEntryHandler))
+          .pin();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void unpin(long memtableId) {
+    final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+    lock.lock();
+    try {
+      memtableIdToPipeWALResourceMap.get(memtableId).unpin();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (ttlCheckerFuture != null) {
+      ttlCheckerFuture.cancel(true);
+    }
+
+    for (final long memtableId : memtableIdToPipeWALResourceMap.keySet()) {
+      final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+      lock.lock();
+      try {
+        memtableIdToPipeWALResourceMap.get(memtableId).close();
+        memtableIdToPipeWALResourceMap.remove(memtableId);
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  public int getReferenceCount(long memtableId) {
+    final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
+
+    lock.lock();
+    try {
+      return memtableIdToPipeWALResourceMap.get(memtableId).getReferenceCount();
+    } finally {
+      lock.unlock();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 742b2230fa6..df0ea7d1fcd 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -39,6 +39,9 @@ public class PipeConnectorSubtask extends PipeSubtask {
   private final ListenableBlockingPendingQueue<Event> inputPendingQueue;
   private final PipeConnector outputPipeConnector;
 
+  private static final int HEARTBEAT_CHECK_INTERVAL = 1000;
+  private int executeOnceInvokedTimes;
+
   /** @param taskID connectorAttributeSortedString */
   public PipeConnectorSubtask(
       String taskID,
@@ -47,13 +50,15 @@ public class PipeConnectorSubtask extends PipeSubtask {
     super(taskID);
     this.inputPendingQueue = inputPendingQueue;
     this.outputPipeConnector = outputPipeConnector;
+    executeOnceInvokedTimes = 0;
   }
 
   @Override
   protected synchronized boolean executeOnce() {
     try {
-      // TODO: reduce the frequency of heartbeat
-      outputPipeConnector.heartbeat();
+      if (executeOnceInvokedTimes++ % HEARTBEAT_CHECK_INTERVAL == 0) {
+        outputPipeConnector.heartbeat();
+      }
     } catch (Exception e) {
       throw new PipeConnectionException(
           "PipeConnector: failed to connect to the target system.", e);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index bd4a5dd5f13..9aff438a835 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -256,7 +256,7 @@ public class WALBuffer extends AbstractWALBuffer {
       // update related info
       totalSize += size;
       info.metaData.add(size, searchIndex);
-      walEntry.getWalFlushListener().getWalPipeHandler().setSize(size);
+      walEntry.getWalFlushListener().getWalEntryHandler().setSize(size);
       info.fsyncListeners.add(walEntry.getWalFlushListener());
     }
 
@@ -495,9 +495,9 @@ public class WALBuffer extends AbstractWALBuffer {
       if (forceSuccess) {
         for (WALFlushListener fsyncListener : info.fsyncListeners) {
           fsyncListener.succeed();
-          if (fsyncListener.getWalPipeHandler() != null) {
-            fsyncListener.getWalPipeHandler().setEntryPosition(walFileVersionId, position);
-            position += fsyncListener.getWalPipeHandler().getSize();
+          if (fsyncListener.getWalEntryHandler() != null) {
+            fsyncListener.getWalEntryHandler().setEntryPosition(walFileVersionId, position);
+            position += fsyncListener.getWalEntryHandler().getSize();
           }
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 4153c3c49f5..3eacc838cec 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -150,8 +150,8 @@ public class WALNode implements IWALNode {
   private WALFlushListener log(WALEntry walEntry) {
     buffer.write(walEntry);
     // set handler for pipe
-    walEntry.getWalFlushListener().getWalPipeHandler().setMemTableId(walEntry.getMemTableId());
-    walEntry.getWalFlushListener().getWalPipeHandler().setWalNode(this);
+    walEntry.getWalFlushListener().getWalEntryHandler().setMemTableId(walEntry.getMemTableId());
+    walEntry.getWalFlushListener().getWalEntryHandler().setWalNode(this);
     return walEntry.getWalFlushListener();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryHandler.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
rename to server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryHandler.java
index abdb4771a93..e8849dc6536 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryHandler.java
@@ -31,8 +31,8 @@ import org.slf4j.LoggerFactory;
  * This handler is used by the Pipe to find the corresponding insert node. Besides, it can try to
  * pin/unpin the wal entries by the memTable id.
  */
-public class WALPipeHandler {
-  private static final Logger logger = LoggerFactory.getLogger(WALPipeHandler.class);
+public class WALEntryHandler {
+  private static final Logger logger = LoggerFactory.getLogger(WALEntryHandler.class);
 
   private long memTableId = -1;
   /** cached value, null after this value is flushed to wal successfully */
@@ -42,7 +42,7 @@ public class WALPipeHandler {
   /** wal node, null when wal is disabled */
   private WALNode walNode = null;
 
-  public WALPipeHandler(WALEntryValue value) {
+  public WALEntryHandler(WALEntryValue value) {
     this.value = value;
   }
 
@@ -101,6 +101,10 @@ public class WALPipeHandler {
     }
   }
 
+  public long getMemTableId() {
+    return memTableId;
+  }
+
   public void setMemTableId(long memTableId) {
     this.memTableId = memTableId;
   }
@@ -129,4 +133,16 @@ public class WALPipeHandler {
   public void setSize(int size) {
     this.walEntryPosition.setSize(size);
   }
+
+  @Override
+  public String toString() {
+    return "WALEntryHandler{"
+        + "memTableId="
+        + memTableId
+        + ", value="
+        + value
+        + ", walEntryPosition="
+        + walEntryPosition
+        + '}';
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
index 1b468635899..2fecd1f585c 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
@@ -19,19 +19,19 @@
 package org.apache.iotdb.db.wal.utils.listener;
 
 import org.apache.iotdb.db.wal.buffer.WALEntryValue;
-import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 
 /** This class helps judge whether wal is flushed to the storage device. */
 public class WALFlushListener extends AbstractResultListener {
   // handler for pipeline, only exists then value is InsertNode
-  private final WALPipeHandler walPipeHandler;
+  private final WALEntryHandler walEntryHandler;
 
   public WALFlushListener(boolean wait, WALEntryValue value) {
     super(wait);
-    walPipeHandler = new WALPipeHandler(value);
+    walEntryHandler = new WALEntryHandler(value);
   }
 
-  public WALPipeHandler getWalPipeHandler() {
-    return walPipeHandler;
+  public WALEntryHandler getWalEntryHandler() {
+    return walEntryHandler;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 7cd705af588..45b0cf504b2 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCo
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -57,6 +58,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
+import static org.mockito.Mockito.mock;
+
 public class PipeRealtimeCollectTest {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeCollectTest.class);
@@ -229,6 +232,7 @@ public class PipeRealtimeCollectTest {
   }
 
   private Future<?> write2DataRegion(int writeNum, String dataRegionId, int startNum) {
+
     File dataRegionDir =
         new File(tsFileDir.getPath() + File.separator + dataRegionId + File.separator + "0");
     boolean ignored = dataRegionDir.mkdirs();
@@ -249,6 +253,7 @@ public class PipeRealtimeCollectTest {
             PipeInsertionDataNodeListener.getInstance()
                 .listenToInsertNode(
                     dataRegionId,
+                    mock(WALEntryHandler.class),
                     new InsertRowNode(
                         new PlanNodeId(String.valueOf(i)),
                         new PartialPath(device),
@@ -262,6 +267,7 @@ public class PipeRealtimeCollectTest {
             PipeInsertionDataNodeListener.getInstance()
                 .listenToInsertNode(
                     dataRegionId,
+                    mock(WALEntryHandler.class),
                     new InsertRowNode(
                         new PlanNodeId(String.valueOf(i)),
                         new PartialPath(device),
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
index b2441aa9d9f..ef86b0db285 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.pipe.resource.file.PipeFileResourceManager;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.write.TsFileWriter;
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/WALPipeHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/WALEntryHandlerTest.java
similarity index 94%
rename from server/src/test/java/org/apache/iotdb/db/wal/node/WALPipeHandlerTest.java
rename to server/src/test/java/org/apache/iotdb/db/wal/node/WALEntryHandlerTest.java
index 77c28c6d333..d34ca54cae8 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/WALPipeHandlerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/WALEntryHandlerTest.java
@@ -33,8 +33,8 @@ import org.apache.iotdb.db.wal.buffer.WALEntry;
 import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
 import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
 import org.apache.iotdb.db.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 import org.apache.iotdb.db.wal.utils.WALMode;
-import org.apache.iotdb.db.wal.utils.WALPipeHandler;
 import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -49,7 +49,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-public class WALPipeHandlerTest {
+public class WALEntryHandlerTest {
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final String identifier = String.valueOf(Integer.MAX_VALUE);
   private static final String logDirectory = TestConstant.BASE_OUTPUT_PATH.concat("wal-test");
@@ -85,7 +85,7 @@ public class WALPipeHandlerTest {
             memTable.getMemTableId(), getInsertRowNode(devicePath, System.currentTimeMillis()));
     walNode.onMemTableFlushed(memTable);
     // pin flushed memTable
-    WALPipeHandler handler = flushListener.getWalPipeHandler();
+    WALEntryHandler handler = flushListener.getWalEntryHandler();
     handler.pinMemTable();
   }
 
@@ -97,7 +97,7 @@ public class WALPipeHandlerTest {
     node1.setSearchIndex(1);
     WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
     // pin memTable
-    WALPipeHandler handler = flushListener.getWalPipeHandler();
+    WALEntryHandler handler = flushListener.getWalEntryHandler();
     handler.pinMemTable();
     walNode.onMemTableFlushed(memTable);
     // roll wal file
@@ -128,7 +128,7 @@ public class WALPipeHandlerTest {
             memTable.getMemTableId(), getInsertRowNode(devicePath, System.currentTimeMillis()));
     walNode.onMemTableFlushed(memTable);
     // pin flushed memTable
-    WALPipeHandler handler = flushListener.getWalPipeHandler();
+    WALEntryHandler handler = flushListener.getWalEntryHandler();
     handler.unpinMemTable();
   }
 
@@ -139,7 +139,7 @@ public class WALPipeHandlerTest {
     WALFlushListener flushListener =
         walNode.log(
             memTable.getMemTableId(), getInsertRowNode(devicePath, System.currentTimeMillis()));
-    WALPipeHandler handler = flushListener.getWalPipeHandler();
+    WALEntryHandler handler = flushListener.getWalEntryHandler();
     // pin twice
     handler.pinMemTable();
     handler.pinMemTable();
@@ -164,7 +164,7 @@ public class WALPipeHandlerTest {
     node1.setSearchIndex(1);
     WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
     // pin memTable
-    WALPipeHandler handler = flushListener.getWalPipeHandler();
+    WALEntryHandler handler = flushListener.getWalEntryHandler();
     handler.pinMemTable();
     walNode.onMemTableFlushed(memTable);
     // roll wal file
@@ -193,7 +193,7 @@ public class WALPipeHandlerTest {
     node1.setSearchIndex(1);
     WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
     // pin memTable
-    WALPipeHandler handler = flushListener.getWalPipeHandler();
+    WALEntryHandler handler = flushListener.getWalEntryHandler();
     handler.pinMemTable();
     walNode.onMemTableFlushed(memTable);
     assertEquals(node1, handler.getValue());
@@ -207,7 +207,7 @@ public class WALPipeHandlerTest {
     node1.setSearchIndex(1);
     WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
     // pin memTable
-    WALPipeHandler handler = flushListener.getWalPipeHandler();
+    WALEntryHandler handler = flushListener.getWalEntryHandler();
     handler.pinMemTable();
     walNode.onMemTableFlushed(memTable);
     // wait until wal flushed
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java
index 98459619a70..1e5dc245cc9 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java
@@ -79,7 +79,7 @@ public class WALInsertNodeCacheTest {
     InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
     node1.setSearchIndex(1);
     WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
-    WALEntryPosition position = flushListener.getWalPipeHandler().getWalEntryPosition();
+    WALEntryPosition position = flushListener.getWalEntryHandler().getWalEntryPosition();
     // wait until wal flushed
     while (!walNode.isAllWALEntriesConsumed() || !position.canRead()) {
       Thread.sleep(50);
@@ -96,18 +96,18 @@ public class WALInsertNodeCacheTest {
     InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
     node1.setSearchIndex(1);
     WALFlushListener flushListener1 = walNode.log(memTable1.getMemTableId(), node1);
-    WALEntryPosition position1 = flushListener1.getWalPipeHandler().getWalEntryPosition();
+    WALEntryPosition position1 = flushListener1.getWalEntryHandler().getWalEntryPosition();
     InsertRowNode node2 = getInsertRowNode(devicePath, System.currentTimeMillis());
     node1.setSearchIndex(2);
     WALFlushListener flushListener2 = walNode.log(memTable1.getMemTableId(), node2);
-    WALEntryPosition position2 = flushListener2.getWalPipeHandler().getWalEntryPosition();
+    WALEntryPosition position2 = flushListener2.getWalEntryHandler().getWalEntryPosition();
     // write memTable2
     IMemTable memTable2 = new PrimitiveMemTable();
     walNode.onMemTableCreated(memTable2, logDirectory + "/" + "fake2.tsfile");
     InsertRowNode node3 = getInsertRowNode(devicePath, System.currentTimeMillis());
     node1.setSearchIndex(3);
     WALFlushListener flushListener3 = walNode.log(memTable2.getMemTableId(), node3);
-    WALEntryPosition position3 = flushListener3.getWalPipeHandler().getWalEntryPosition();
+    WALEntryPosition position3 = flushListener3.getWalEntryHandler().getWalEntryPosition();
     // wait until wal flushed
     walNode.rollWALFile();
     while (!walNode.isAllWALEntriesConsumed() || !position3.canRead()) {