You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/08/05 05:51:19 UTC

[iotdb] branch master updated: [IOTDB-6100] Pipe: Fix running in hybrid mode will cause wal cannot be deleted & some pipe data lost due to wrong ProducerType of Disruptor (#10790)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3def1f80976 [IOTDB-6100] Pipe: Fix running in hybrid mode will cause wal cannot be deleted & some pipe data lost due to wrong ProducerType of Disruptor (#10790)
3def1f80976 is described below

commit 3def1f809768073cdfa9578bc1abd4fead569ac8
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sat Aug 5 13:51:14 2023 +0800

    [IOTDB-6100] Pipe: Fix running in hybrid mode will cause wal cannot be deleted & some pipe data lost due to wrong ProducerType of Disruptor (#10790)
    
    * Fix: Running in hybrid mode will cause wal cannot be deleted
    * Fix: some pipe data is lost due to wrong ProducerType of Disruptor
---
 .../PipeRealtimeDataRegionHybridExtractor.java     | 90 ++++++++++++++++------
 .../PipeRealtimeDataRegionLogExtractor.java        | 22 ++++--
 .../PipeRealtimeDataRegionTsFileExtractor.java     | 24 +++---
 .../realtime/assigner/DisruptorQueue.java          | 85 ++++++++------------
 .../realtime/assigner/PipeDataRegionAssigner.java  | 10 +--
 .../db/pipe/resource/wal/PipeWALResource.java      |  4 +
 .../pipe/resource/wal/PipeWALResourceManager.java  | 12 ++-
 .../iotdb/commons/concurrent/ThreadName.java       |  2 +
 8 files changed, 147 insertions(+), 102 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index ffefb13daf0..9b8ff792be4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -73,23 +73,41 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
 
   private void extractTabletInsertion(PipeRealtimeEvent event) {
     if (isApproachingCapacity()) {
-      event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
       // if the pending queue is approaching capacity, we should not extract any more tablet events.
       // all the data represented by the tablet events should be carried by the following tsfile
       // event.
-      return;
+      event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
     }
 
-    if (!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)
-        && !pendingQueue.waitedOffer(event)) {
-      LOGGER.warn(
-          "extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor {} "
-              + "has reached capacity, discard tablet event {}, current state {}",
-          this,
-          event,
-          event.getTsFileEpoch().getState(this));
-      // this would not happen, but just in case.
-      // UnboundedBlockingPendingQueue is unbounded, so it should never reach capacity.
+    final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
+    switch (state) {
+      case USING_TSFILE:
+        // Ignore the tablet event.
+        event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+        break;
+      case EMPTY:
+      case USING_TABLET:
+        if (!pendingQueue.waitedOffer(event)) {
+          // this would not happen, but just in case.
+          // pendingQueue is unbounded, so it should never reach capacity.
+          final String errorMessage =
+              String.format(
+                  "extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s "
+                      + "has reached capacity, discard tablet event %s, current state %s",
+                  this, event, event.getTsFileEpoch().getState(this));
+          LOGGER.error(errorMessage);
+          PipeAgent.runtime()
+              .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
+
+          // Ignore the tablet event.
+          event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            String.format(
+                "Unsupported state %s for hybrid realtime extractor %s",
+                state, PipeRealtimeDataRegionHybridExtractor.class.getName()));
     }
   }
 
@@ -101,15 +119,35 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
             state ->
                 state.equals(TsFileEpoch.State.EMPTY) ? TsFileEpoch.State.USING_TSFILE : state);
 
-    if (!pendingQueue.waitedOffer(event)) {
-      LOGGER.warn(
-          "extractTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor {} "
-              + "has reached capacity, discard TsFile event {}, current state {}",
-          this,
-          event,
-          event.getTsFileEpoch().getState(this));
-      // this would not happen, but just in case.
-      // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
+    final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
+    switch (state) {
+      case EMPTY:
+      case USING_TSFILE:
+        if (!pendingQueue.waitedOffer(event)) {
+          // this would not happen, but just in case.
+          // pendingQueue is unbounded, so it should never reach capacity.
+          final String errorMessage =
+              String.format(
+                  "extractTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s "
+                      + "has reached capacity, discard TsFile event %s, current state %s",
+                  this, event, event.getTsFileEpoch().getState(this));
+          LOGGER.error(errorMessage);
+          PipeAgent.runtime()
+              .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
+
+          // Ignore the tsfile event.
+          event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+        }
+        break;
+      case USING_TABLET:
+        // All the tablet events have been extracted, so we can ignore the tsfile event.
+        event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            String.format(
+                "Unsupported state %s for hybrid realtime extractor %s",
+                state, PipeRealtimeDataRegionHybridExtractor.class.getName()));
     }
   }
 
@@ -139,6 +177,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
       }
 
       realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+
       if (suppliedEvent != null) {
         return suppliedEvent;
       }
@@ -166,7 +205,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
         // this event is not reliable anymore. but the data represented by this event
         // has been carried by the following tsfile event, so we can just discard this event.
         event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
-        LOGGER.warn("Increase reference count for event {} error.", event);
+        LOGGER.warn(
+            "Discard tablet event {} because it is not reliable anymore. "
+                + "Change the state of TsFileEpoch to USING_TSFILE.",
+            event);
         return null;
       }
     }
@@ -182,7 +224,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
             state -> {
               // this would not happen, but just in case.
               if (state.equals(TsFileEpoch.State.EMPTY)) {
-                LOGGER.warn(
+                LOGGER.error(
                     String.format("EMPTY TsFileEpoch when supplying TsFile Event %s", event));
                 return TsFileEpoch.State.USING_TSFILE;
               }
@@ -202,7 +244,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
                     + "the reference count can not be increased, "
                     + "the data represented by this event is lost",
                 event.getEvent());
-        LOGGER.warn(errorMessage);
+        LOGGER.error(errorMessage);
         PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
         return null;
       }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index 74ff0c533b1..156c8891710 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -48,18 +48,23 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
     event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TABLET);
 
     if (!(event.getEvent() instanceof TabletInsertionEvent)) {
+      event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
       return;
     }
 
     if (!pendingQueue.waitedOffer(event)) {
-      LOGGER.warn(
-          "extract: pending queue of PipeRealtimeDataRegionLogExtractor {} "
-              + "has reached capacity, discard tablet event {}, current state {}",
-          this,
-          event,
-          event.getTsFileEpoch().getState(this));
       // this would not happen, but just in case.
-      // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
+      // pendingQueue is unbounded, so it should never reach capacity.
+      final String errorMessage =
+          String.format(
+              "extract: pending queue of PipeRealtimeDataRegionLogExtractor %s "
+                  + "has reached capacity, discard tablet event %s, current state %s",
+              this, event, event.getTsFileEpoch().getState(this));
+      LOGGER.error(errorMessage);
+      PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
+
+      // ignore this event.
+      event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
     }
   }
 
@@ -93,11 +98,12 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
                     + "the reference count can not be increased, "
                     + "the data represented by this event is lost",
                 realtimeEvent.getEvent());
-        LOGGER.warn(errorMessage);
+        LOGGER.error(errorMessage);
         PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
       }
 
       realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+
       if (suppliedEvent != null) {
         return suppliedEvent;
       }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index bab1ea2ec46..cae2e6f3c1b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -48,18 +48,23 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
     event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
 
     if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
+      event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
       return;
     }
 
     if (!pendingQueue.waitedOffer(event)) {
-      LOGGER.warn(
-          "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor {} "
-              + "has reached capacity, discard TsFile event {}, current state {}",
-          this,
-          event,
-          event.getTsFileEpoch().getState(this));
-      // this would not happen, but just in case.
-      // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
+      // This would not happen, but just in case.
+      // Pending is unbounded, so it should never reach capacity.
+      final String errorMessage =
+          String.format(
+              "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor %s "
+                  + "has reached capacity, discard TsFile event %s, current state %s",
+              this, event, event.getTsFileEpoch().getState(this));
+      LOGGER.error(errorMessage);
+      PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
+
+      // Ignore the event.
+      event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
     }
   }
 
@@ -93,11 +98,12 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
                     + "the reference count can not be increased, "
                     + "the data represented by this event is lost",
                 realtimeEvent.getEvent());
-        LOGGER.warn(errorMessage);
+        LOGGER.error(errorMessage);
         PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
       }
 
       realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
+
       if (suppliedEvent != null) {
         return suppliedEvent;
       }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
index 9c6d97901aa..b7823171da7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
@@ -19,81 +19,62 @@
 
 package org.apache.iotdb.db.pipe.extractor.realtime.assigner;
 
+import org.apache.iotdb.commons.concurrent.IoTDBDaemonThreadFactory;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
-import com.lmax.disruptor.util.DaemonThreadFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ThreadFactory;
+import static org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISRUPTOR;
 
-public class DisruptorQueue<E> {
+public class DisruptorQueue {
 
-  private Disruptor<Container<E>> disruptor;
-  private RingBuffer<Container<E>> ringBuffer;
+  private static final IoTDBDaemonThreadFactory THREAD_FACTORY =
+      new IoTDBDaemonThreadFactory(PIPE_EXTRACTOR_DISRUPTOR.getName());
 
-  private DisruptorQueue() {}
+  private final Disruptor<EventContainer> disruptor;
+  private final RingBuffer<EventContainer> ringBuffer;
 
-  public void publish(E obj) {
-    ringBuffer.publishEvent((container, sequence, o) -> container.setObj(o), obj);
+  public DisruptorQueue(EventHandler<PipeRealtimeEvent> eventHandler) {
+    disruptor =
+        new Disruptor<>(
+            EventContainer::new,
+            PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize(),
+            THREAD_FACTORY,
+            ProducerType.MULTI,
+            new BlockingWaitStrategy());
+    disruptor.handleEventsWith(
+        (container, sequence, endOfBatch) ->
+            eventHandler.onEvent(container.getEvent(), sequence, endOfBatch));
+    disruptor.setDefaultExceptionHandler(new DisruptorQueueExceptionHandler());
+
+    ringBuffer = disruptor.start();
+  }
+
+  public void publish(PipeRealtimeEvent event) {
+    ringBuffer.publishEvent((container, sequence, o) -> container.setEvent(event), event);
   }
 
   public void clear() {
     disruptor.halt();
   }
 
-  public static class Builder<E> {
-    private int ringBufferSize =
-        PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize();
-    private ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;
-    private ProducerType producerType = ProducerType.MULTI;
-    private WaitStrategy waitStrategy = new BlockingWaitStrategy();
-    private final List<EventHandler<E>> handlers = new ArrayList<>();
-
-    public Builder<E> setProducerType(ProducerType producerType) {
-      this.producerType = producerType;
-      return this;
-    }
-
-    public Builder<E> addEventHandler(EventHandler<E> eventHandler) {
-      this.handlers.add(eventHandler);
-      return this;
-    }
-
-    public DisruptorQueue<E> build() {
-      DisruptorQueue<E> disruptorQueue = new DisruptorQueue<>();
-      disruptorQueue.disruptor =
-          new Disruptor<>(
-              Container::new, ringBufferSize, threadFactory, producerType, waitStrategy);
-      for (EventHandler<E> handler : handlers) {
-        disruptorQueue.disruptor.handleEventsWith(
-            (container, sequence, endOfBatch) ->
-                handler.onEvent(container.getObj(), sequence, endOfBatch));
-      }
-      disruptorQueue.disruptor.setDefaultExceptionHandler(new DisruptorQueueExceptionHandler());
-      disruptorQueue.disruptor.start();
-      disruptorQueue.ringBuffer = disruptorQueue.disruptor.getRingBuffer();
-      return disruptorQueue;
-    }
-  }
+  private static class EventContainer {
 
-  private static class Container<E> {
-    private E obj;
+    private PipeRealtimeEvent event;
 
-    private Container() {}
+    private EventContainer() {}
 
-    public E getObj() {
-      return obj;
+    public PipeRealtimeEvent getEvent() {
+      return event;
     }
 
-    public void setObj(E obj) {
-      this.obj = obj;
+    public void setEvent(PipeRealtimeEvent event) {
+      this.event = event;
     }
   }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
index d5ca1a10b90..ee569e9e4ff 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
@@ -24,23 +24,17 @@ import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtract
 import org.apache.iotdb.db.pipe.extractor.realtime.matcher.CachedSchemaPatternMatcher;
 import org.apache.iotdb.db.pipe.extractor.realtime.matcher.PipeDataRegionMatcher;
 
-import com.lmax.disruptor.dsl.ProducerType;
-
 public class PipeDataRegionAssigner {
 
   /** The matcher is used to match the event with the extractor based on the pattern. */
   private final PipeDataRegionMatcher matcher;
 
   /** The disruptor is used to assign the event to the extractor. */
-  private final DisruptorQueue<PipeRealtimeEvent> disruptor;
+  private final DisruptorQueue disruptor;
 
   public PipeDataRegionAssigner() {
     this.matcher = new CachedSchemaPatternMatcher();
-    this.disruptor =
-        new DisruptorQueue.Builder<PipeRealtimeEvent>()
-            .setProducerType(ProducerType.SINGLE)
-            .addEventHandler(this::assignToExtractor)
-            .build();
+    this.disruptor = new DisruptorQueue(this::assignToExtractor);
   }
 
   public void publishToAssign(PipeRealtimeEvent event) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
index 085c71d5f9c..358e319dafa 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -162,4 +162,8 @@ public abstract class PipeWALResource implements Closeable {
 
     referenceCount.set(0);
   }
+
+  public int getReferenceCount() {
+    return referenceCount.get();
+  }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index 64cd83f1a67..afbe6931533 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -24,6 +24,9 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
@@ -34,6 +37,8 @@ import java.util.concurrent.locks.ReentrantLock;
 
 public abstract class PipeWALResourceManager {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeWALResourceManager.class);
+
   protected final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap;
 
   private static final int SEGMENT_LOCK_COUNT = 32;
@@ -55,7 +60,7 @@ public abstract class PipeWALResourceManager {
     ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
         PIPE_WAL_RESOURCE_TTL_CHECKER,
         () -> {
-          Iterator<Map.Entry<Long, PipeWALResource>> iterator =
+          final Iterator<Map.Entry<Long, PipeWALResource>> iterator =
               memtableIdToPipeWALResourceMap.entrySet().iterator();
           while (iterator.hasNext()) {
             final Map.Entry<Long, PipeWALResource> entry = iterator.next();
@@ -66,6 +71,11 @@ public abstract class PipeWALResourceManager {
             try {
               if (entry.getValue().invalidateIfPossible()) {
                 iterator.remove();
+              } else {
+                LOGGER.info(
+                    "WAL (memtableId {}) is still referenced {} times",
+                    entry.getKey(),
+                    entry.getValue().getReferenceCount());
               }
             } finally {
               lock.unlock();
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index aff32b22695..0382f963a42 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -122,6 +122,7 @@ public enum ThreadName {
   GPRC_DEFAULT_WORKER_ELG("grpc-default-worker-ELG"),
   GROUP_MANAGEMENT("groupManagement"),
   // -------------------------- Compute --------------------------
+  PIPE_EXTRACTOR_DISRUPTOR("Pipe-Extractor-Disruptor"),
   PIPE_ASSIGNER_EXECUTOR_POOL("Pipe-Assigner-Executor-Pool"),
   PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"),
   PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
@@ -257,6 +258,7 @@ public enum ThreadName {
   private static final Set<ThreadName> computeThreadNames =
       new HashSet<>(
           Arrays.asList(
+              PIPE_EXTRACTOR_DISRUPTOR,
               PIPE_ASSIGNER_EXECUTOR_POOL,
               PIPE_PROCESSOR_EXECUTOR_POOL,
               PIPE_CONNECTOR_EXECUTOR_POOL,