You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/31 11:54:04 UTC

[iotdb] branch master updated: [IOTDB-5958] Pipe: remove listenable features of PendingQueue to improve performance (#10004)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0db34827a9a [IOTDB-5958] Pipe: remove listenable features of PendingQueue to improve performance (#10004)
0db34827a9a is described below

commit 0db34827a9a670bf8a4dbb43484f270a275ef22a
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed May 31 19:53:57 2023 +0800

    [IOTDB-5958] Pipe: remove listenable features of PendingQueue to improve performance (#10004)
---
 .../core/collector/IoTDBDataRegionCollector.java   |   6 +-
 .../PipeRealtimeDataRegionHybridCollector.java     |  19 ++-
 .../PipeRealtimeDataRegionLogCollector.java        |  10 +-
 .../PipeRealtimeDataRegionTsFileCollector.java     |  10 +-
 .../manager/PipeConnectorSubtaskLifeCycle.java     |  25 +--
 .../manager/PipeConnectorSubtaskManager.java       |   8 +-
 .../event/view/collector/PipeEventCollector.java   |  17 +-
 .../db/pipe/task/queue/BlockingPendingQueue.java   |  73 +++++++++
 ...Queue.java => BoundedBlockingPendingQueue.java} |   9 +-
 .../task/queue/ListenableBlockingPendingQueue.java | 176 ---------------------
 .../queue/PendingQueueEmptyToNotEmptyListener.java |  26 ---
 .../queue/PendingQueueFullToNotFullListener.java   |  26 ---
 .../queue/PendingQueueNotEmptyToEmptyListener.java |  26 ---
 .../queue/PendingQueueNotFullToFullListener.java   |  26 ---
 ...eue.java => UnboundedBlockingPendingQueue.java} |   5 +-
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |   8 +-
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |   4 +-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |  60 ++-----
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |   6 +-
 .../core/collector/PipeRealtimeCollectTest.java    |  13 +-
 .../executor/PipeConnectorSubtaskExecutorTest.java |   4 +-
 21 files changed, 138 insertions(+), 419 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index 37a9c5101a9..757599f2b24 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionFa
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionLogCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionTsFileCollector;
-import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -59,7 +59,7 @@ public class IoTDBDataRegionCollector implements PipeCollector {
 
   private final PipeTaskMeta pipeTaskMeta;
   private final long creationTime;
-  private final ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue;
+  private final UnboundedBlockingPendingQueue<Event> collectorPendingQueue;
 
   // TODO: support pattern in historical collector
   private PipeHistoricalDataRegionCollector historicalCollector;
@@ -70,7 +70,7 @@ public class IoTDBDataRegionCollector implements PipeCollector {
   public IoTDBDataRegionCollector(
       PipeTaskMeta pipeTaskMeta,
       long creationTime,
-      ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue) {
+      UnboundedBlockingPendingQueue<Event> collectorPendingQueue) {
     this.hasBeenStarted = new AtomicBoolean(false);
 
     this.pipeTaskMeta = pipeTaskMeta;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 872b9c56f07..493240455aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
-import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -39,13 +39,12 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionHybridCollector.class);
 
-  // TODO: memory control
   // This queue is used to store pending events collected by the method collect(). The method
   // supply() will poll events from this queue and send them to the next pipe plugin.
-  private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
+  private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
   public PipeRealtimeDataRegionHybridCollector(
-      PipeTaskMeta pipeTaskMeta, ListenableUnboundedBlockingPendingQueue<Event> pendingQueue) {
+      PipeTaskMeta pipeTaskMeta, UnboundedBlockingPendingQueue<Event> pendingQueue) {
     super(pipeTaskMeta);
     this.pendingQueue = pendingQueue;
   }
@@ -86,7 +85,14 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
     }
 
     if (!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) {
-      pendingQueue.offer(event);
+      if (!pendingQueue.offer(event)) {
+        LOGGER.warn(
+            String.format(
+                "collectTabletInsertion: pending queue of PipeRealtimeDataRegionHybridCollector %s has reached capacity, discard tablet event %s, current state %s",
+                this, event, event.getTsFileEpoch().getState(this)));
+        // this would not happen, but just in case.
+        // UnboundedBlockingPendingQueue is unbounded, so it should never reach capacity.
+      }
     }
   }
 
@@ -101,11 +107,10 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
     if (!pendingQueue.offer(event)) {
       LOGGER.warn(
           String.format(
-              "Pending Queue of Hybrid Realtime Collector %s has reached capacity, discard TsFile Event %s, current state %s",
+              "collectTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridCollector %s has reached capacity, discard TsFile event %s, current state %s",
               this, event, event.getTsFileEpoch().getState(this)));
       // this would not happen, but just in case.
       // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
-      // TODO: memory control when elements in queue are too many.
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
index c7c96b650e4..e6835187cfc 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
-import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
@@ -36,13 +36,12 @@ public class PipeRealtimeDataRegionLogCollector extends PipeRealtimeDataRegionCo
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionLogCollector.class);
 
-  // TODO: memory control
   // This queue is used to store pending events collected by the method collect(). The method
   // supply() will poll events from this queue and send them to the next pipe plugin.
-  private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
+  private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
   public PipeRealtimeDataRegionLogCollector(
-      PipeTaskMeta pipeTaskMeta, ListenableUnboundedBlockingPendingQueue<Event> pendingQueue) {
+      PipeTaskMeta pipeTaskMeta, UnboundedBlockingPendingQueue<Event> pendingQueue) {
     super(pipeTaskMeta);
     this.pendingQueue = pendingQueue;
   }
@@ -58,11 +57,10 @@ public class PipeRealtimeDataRegionLogCollector extends PipeRealtimeDataRegionCo
     if (!pendingQueue.offer(event)) {
       LOGGER.warn(
           String.format(
-              "Pending Queue of Log Realtime Collector %s has reached capacity, discard Tablet Event %s, current state %s",
+              "collect: pending queue of PipeRealtimeDataRegionLogCollector %s has reached capacity, discard tablet event %s, current state %s",
               this, event, event.getTsFileEpoch().getState(this)));
       // this would not happen, but just in case.
       // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
-      // TODO: memory control when elements in queue are too many.
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
index 42bec421eed..1b00fdd974e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
-import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
@@ -36,13 +36,12 @@ public class PipeRealtimeDataRegionTsFileCollector extends PipeRealtimeDataRegio
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionTsFileCollector.class);
 
-  // TODO: memory control
   // This queue is used to store pending events collected by the method collect(). The method
   // supply() will poll events from this queue and send them to the next pipe plugin.
-  private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
+  private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
   public PipeRealtimeDataRegionTsFileCollector(
-      PipeTaskMeta pipeTaskMeta, ListenableUnboundedBlockingPendingQueue<Event> pendingQueue) {
+      PipeTaskMeta pipeTaskMeta, UnboundedBlockingPendingQueue<Event> pendingQueue) {
     super(pipeTaskMeta);
     this.pendingQueue = pendingQueue;
   }
@@ -58,11 +57,10 @@ public class PipeRealtimeDataRegionTsFileCollector extends PipeRealtimeDataRegio
     if (!pendingQueue.offer(event)) {
       LOGGER.warn(
           String.format(
-              "Pending Queue of TsFile Realtime Collector %s has reached capacity, discard TsFile Event %s, current state %s",
+              "collect: pending queue of PipeRealtimeDataRegionTsFileCollector %s has reached capacity, discard TsFile event %s, current state %s",
               this, event, event.getTsFileEpoch().getState(this)));
       // this would not happen, but just in case.
       // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
-      // TODO: memory control when elements in queue are too many.
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
index 8827bbb8d59..6936e4610cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.core.connector.manager;
 
 import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -28,7 +28,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
 
   private final PipeConnectorSubtaskExecutor executor;
   private final PipeConnectorSubtask subtask;
-  private final ListenableBoundedBlockingPendingQueue<Event> pendingQueue;
+  private final BoundedBlockingPendingQueue<Event> pendingQueue;
 
   private int runningTaskCount;
   private int aliveTaskCount;
@@ -36,21 +36,11 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
   public PipeConnectorSubtaskLifeCycle(
       PipeConnectorSubtaskExecutor executor,
       PipeConnectorSubtask subtask,
-      ListenableBoundedBlockingPendingQueue<Event> pendingQueue) {
+      BoundedBlockingPendingQueue<Event> pendingQueue) {
     this.executor = executor;
     this.subtask = subtask;
     this.pendingQueue = pendingQueue;
 
-    pendingQueue.registerEmptyToNotEmptyListener(
-        subtask.getTaskID(),
-        () -> {
-          if (hasRunningTasks()) {
-            executor.start(subtask.getTaskID());
-          }
-        });
-    this.pendingQueue.registerNotEmptyToEmptyListener(
-        subtask.getTaskID(), () -> executor.stop(subtask.getTaskID()));
-
     runningTaskCount = 0;
     aliveTaskCount = 0;
   }
@@ -59,7 +49,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
     return subtask;
   }
 
-  public ListenableBoundedBlockingPendingQueue<Event> getPendingQueue() {
+  public BoundedBlockingPendingQueue<Event> getPendingQueue() {
     return pendingQueue;
   }
 
@@ -113,13 +103,6 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
 
   @Override
   public synchronized void close() {
-    pendingQueue.removeEmptyToNotEmptyListener(subtask.getTaskID());
-    pendingQueue.removeNotEmptyToEmptyListener(subtask.getTaskID());
-
     executor.deregister(subtask.getTaskID());
   }
-
-  private synchronized boolean hasRunningTasks() {
-    return runningTaskCount > 0;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
index 87fd30b5a30..94b2975358c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
 import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.IoTDBThriftConnectorV1;
 import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -77,8 +77,8 @@ public class PipeConnectorSubtaskManager {
       }
 
       // 2. construct PipeConnectorSubtaskLifeCycle to manage PipeConnectorSubtask's life cycle
-      final ListenableBoundedBlockingPendingQueue<Event> pendingQueue =
-          new ListenableBoundedBlockingPendingQueue<>(
+      final BoundedBlockingPendingQueue<Event> pendingQueue =
+          new BoundedBlockingPendingQueue<>(
               PipeConfig.getInstance().getPipeConnectorPendingQueueSize());
       final PipeConnectorSubtask pipeConnectorSubtask =
           new PipeConnectorSubtask(attributeSortedString, taskMeta, pendingQueue, pipeConnector);
@@ -131,7 +131,7 @@ public class PipeConnectorSubtaskManager {
     return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getSubtask();
   }
 
-  public ListenableBoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
+  public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
       String attributeSortedString) {
     if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
       throw new PipeException(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index 6f2a7d82591..6a8697eec8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.core.event.view.collector;
 
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -29,7 +29,7 @@ import java.util.Queue;
 
 public class PipeEventCollector implements EventCollector {
 
-  private final ListenableBoundedBlockingPendingQueue<Event> pendingQueue;
+  private final BoundedBlockingPendingQueue<Event> pendingQueue;
 
   // buffer queue is used to store events that are not offered to pending queue
   // because the pending queue is full. when pending queue is full, pending queue
@@ -39,7 +39,7 @@ public class PipeEventCollector implements EventCollector {
   // events before events in buffer queue are offered to pending queue.
   private final Queue<Event> bufferQueue;
 
-  public PipeEventCollector(ListenableBoundedBlockingPendingQueue<Event> pendingQueue) {
+  public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue) {
     this.pendingQueue = pendingQueue;
     bufferQueue = new LinkedList<>();
   }
@@ -64,15 +64,4 @@ public class PipeEventCollector implements EventCollector {
       bufferQueue.offer(event);
     }
   }
-
-  public synchronized void tryCollectBufferedEvents() {
-    while (!bufferQueue.isEmpty()) {
-      final Event bufferedEvent = bufferQueue.peek();
-      if (pendingQueue.offer(bufferedEvent)) {
-        bufferQueue.poll();
-      } else {
-        return;
-      }
-    }
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/BlockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/BlockingPendingQueue.java
new file mode 100644
index 00000000000..07465e5fa36
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/BlockingPendingQueue.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.queue;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public abstract class BlockingPendingQueue<E extends Event> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(BlockingPendingQueue.class);
+
+  private static final long MAX_BLOCKING_TIME_MS =
+      PipeConfig.getInstance().getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
+
+  private final BlockingQueue<E> pendingQueue;
+
+  protected BlockingPendingQueue(BlockingQueue<E> pendingQueue) {
+    this.pendingQueue = pendingQueue;
+  }
+
+  public boolean offer(E event) {
+    boolean isAdded = false;
+    try {
+      isAdded = pendingQueue.offer(event, MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.info("pending queue offer is interrupted.", e);
+      Thread.currentThread().interrupt();
+    }
+    return isAdded;
+  }
+
+  public E poll() {
+    E event = null;
+    try {
+      event = pendingQueue.poll(MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.info("pending queue poll is interrupted.", e);
+      Thread.currentThread().interrupt();
+    }
+    return event;
+  }
+
+  public void clear() {
+    pendingQueue.clear();
+  }
+
+  public int size() {
+    return pendingQueue.size();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBoundedBlockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/BoundedBlockingPendingQueue.java
similarity index 75%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBoundedBlockingPendingQueue.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/BoundedBlockingPendingQueue.java
index da91cc4187b..0940fee2753 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBoundedBlockingPendingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/BoundedBlockingPendingQueue.java
@@ -21,12 +21,11 @@ package org.apache.iotdb.db.pipe.task.queue;
 
 import org.apache.iotdb.pipe.api.event.Event;
 
-import org.eclipse.jetty.util.BlockingArrayQueue;
+import java.util.concurrent.ArrayBlockingQueue;
 
-public class ListenableBoundedBlockingPendingQueue<E extends Event>
-    extends ListenableBlockingPendingQueue<E> {
+public class BoundedBlockingPendingQueue<E extends Event> extends BlockingPendingQueue<E> {
 
-  public ListenableBoundedBlockingPendingQueue(int pendingQueueSize) {
-    super(new BlockingArrayQueue<>(pendingQueueSize));
+  public BoundedBlockingPendingQueue(int pendingQueueSize) {
+    super(new ArrayBlockingQueue<>(pendingQueueSize));
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
deleted file mode 100644
index 265bdf09234..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.task.queue;
-
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.pipe.api.event.Event;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public abstract class ListenableBlockingPendingQueue<E extends Event> {
-
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(ListenableBlockingPendingQueue.class);
-
-  private static final long MAX_BLOCKING_TIME_MS =
-      PipeConfig.getInstance().getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
-
-  private final BlockingQueue<E> pendingQueue;
-
-  private final Map<String, PendingQueueEmptyToNotEmptyListener> emptyToNotEmptyListeners =
-      new ConcurrentHashMap<>();
-  private final Map<String, PendingQueueNotEmptyToEmptyListener> notEmptyToEmptyListeners =
-      new ConcurrentHashMap<>();
-  private final Map<String, PendingQueueFullToNotFullListener> fullToNotFullListeners =
-      new ConcurrentHashMap<>();
-  private final Map<String, PendingQueueNotFullToFullListener> notFullToFullListeners =
-      new ConcurrentHashMap<>();
-
-  private final AtomicBoolean isFull = new AtomicBoolean(false);
-
-  protected ListenableBlockingPendingQueue(BlockingQueue<E> pendingQueue) {
-    this.pendingQueue = pendingQueue;
-  }
-
-  public ListenableBlockingPendingQueue<E> registerEmptyToNotEmptyListener(
-      String id, PendingQueueEmptyToNotEmptyListener listener) {
-    emptyToNotEmptyListeners.put(id, listener);
-    return this;
-  }
-
-  public void removeEmptyToNotEmptyListener(String id) {
-    emptyToNotEmptyListeners.remove(id);
-  }
-
-  public void notifyEmptyToNotEmptyListeners() {
-    emptyToNotEmptyListeners
-        .values()
-        .forEach(PendingQueueEmptyToNotEmptyListener::onPendingQueueEmptyToNotEmpty);
-  }
-
-  public ListenableBlockingPendingQueue<E> registerNotEmptyToEmptyListener(
-      String id, PendingQueueNotEmptyToEmptyListener listener) {
-    notEmptyToEmptyListeners.put(id, listener);
-    return this;
-  }
-
-  public void removeNotEmptyToEmptyListener(String id) {
-    notEmptyToEmptyListeners.remove(id);
-  }
-
-  public void notifyNotEmptyToEmptyListeners() {
-    notEmptyToEmptyListeners
-        .values()
-        .forEach(PendingQueueNotEmptyToEmptyListener::onPendingQueueNotEmptyToEmpty);
-  }
-
-  public ListenableBlockingPendingQueue<E> registerFullToNotFullListener(
-      String id, PendingQueueFullToNotFullListener listener) {
-    fullToNotFullListeners.put(id, listener);
-    return this;
-  }
-
-  public void removeFullToNotFullListener(String id) {
-    fullToNotFullListeners.remove(id);
-  }
-
-  public void notifyFullToNotFullListeners() {
-    fullToNotFullListeners
-        .values()
-        .forEach(PendingQueueFullToNotFullListener::onPendingQueueFullToNotFull);
-  }
-
-  public ListenableBlockingPendingQueue<E> registerNotFullToFullListener(
-      String id, PendingQueueNotFullToFullListener listener) {
-    notFullToFullListeners.put(id, listener);
-    return this;
-  }
-
-  public void removeNotFullToFullListener(String id) {
-    notFullToFullListeners.remove(id);
-  }
-
-  public void notifyNotFullToFullListeners() {
-    notFullToFullListeners
-        .values()
-        .forEach(PendingQueueNotFullToFullListener::onPendingQueueNotFullToFull);
-  }
-
-  public boolean offer(E event) {
-    final boolean isEmpty = pendingQueue.isEmpty();
-    final boolean isAdded = pendingQueue.offer(event);
-
-    if (isAdded) {
-      // we don't use size() == 1 to check whether the listener should be called,
-      // because offer() and size() are not atomic, and we don't want to use lock
-      // to make them atomic.
-      if (isEmpty) {
-        notifyEmptyToNotEmptyListeners();
-      }
-    } else {
-      if (isFull.compareAndSet(false, true)) {
-        notifyNotFullToFullListeners();
-      }
-    }
-
-    return isAdded;
-  }
-
-  public E poll() {
-    final boolean isEmpty = pendingQueue.isEmpty();
-    E event = null;
-    try {
-      event = pendingQueue.poll(MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      LOGGER.info("pending queue poll is interrupted.", e);
-      Thread.currentThread().interrupt();
-    }
-
-    if (event == null) {
-      // we don't use size() == 0 to check whether the listener should be called,
-      // because poll() and size() are not atomic, and we don't want to use lock
-      // to make them atomic.
-      if (!isEmpty) {
-        notifyNotEmptyToEmptyListeners();
-      }
-    } else {
-      if (isFull.compareAndSet(true, false)) {
-        notifyFullToNotFullListeners();
-      }
-    }
-
-    return event;
-  }
-
-  public void clear() {
-    pendingQueue.clear();
-  }
-
-  public int size() {
-    return pendingQueue.size();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java
deleted file mode 100644
index d56b6e789ee..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.task.queue;
-
-@FunctionalInterface
-public interface PendingQueueEmptyToNotEmptyListener {
-
-  void onPendingQueueEmptyToNotEmpty();
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java
deleted file mode 100644
index 33225505f71..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.task.queue;
-
-@FunctionalInterface
-public interface PendingQueueFullToNotFullListener {
-
-  void onPendingQueueFullToNotFull();
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java
deleted file mode 100644
index 42257837391..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.task.queue;
-
-@FunctionalInterface
-public interface PendingQueueNotEmptyToEmptyListener {
-
-  void onPendingQueueNotEmptyToEmpty();
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java
deleted file mode 100644
index 2433cd4b8de..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.task.queue;
-
-@FunctionalInterface
-public interface PendingQueueNotFullToFullListener {
-
-  void onPendingQueueNotFullToFull();
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnboundedBlockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/UnboundedBlockingPendingQueue.java
similarity index 85%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnboundedBlockingPendingQueue.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/UnboundedBlockingPendingQueue.java
index 63611067190..3a94393dd08 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnboundedBlockingPendingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/UnboundedBlockingPendingQueue.java
@@ -23,10 +23,9 @@ import org.apache.iotdb.pipe.api.event.Event;
 
 import java.util.concurrent.LinkedBlockingQueue;
 
-public class ListenableUnboundedBlockingPendingQueue<E extends Event>
-    extends ListenableBlockingPendingQueue<E> {
+public class UnboundedBlockingPendingQueue<E extends Event> extends BlockingPendingQueue<E> {
 
-  public ListenableUnboundedBlockingPendingQueue() {
+  public UnboundedBlockingPendingQueue() {
     super(new LinkedBlockingQueue<>());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 60488c6d253..a73a9da8b9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
 import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
-import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -51,7 +51,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
    * processing, and it also can notify the PipeTaskProcessorStage to start processing data when the
    * queue is not empty.
    */
-  private ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue;
+  private UnboundedBlockingPendingQueue<Event> collectorPendingQueue;
 
   private final PipeCollector pipeCollector;
 
@@ -77,7 +77,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
           .getAttribute()
           .put(PipeCollectorConstant.DATA_REGION_KEY, String.valueOf(dataRegionId.getId()));
 
-      collectorPendingQueue = new ListenableUnboundedBlockingPendingQueue<>();
+      collectorPendingQueue = new UnboundedBlockingPendingQueue<>();
       this.pipeCollector =
           new IoTDBDataRegionCollector(pipeTaskMeta, creationTime, collectorPendingQueue);
     } else {
@@ -130,7 +130,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
     return pipeCollector::supply;
   }
 
-  public ListenableUnboundedBlockingPendingQueue<Event> getCollectorPendingQueue() {
+  public UnboundedBlockingPendingQueue<Event> getCollectorPendingQueue() {
     return collectorPendingQueue;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 98e8ba7e965..5e43b1a2d18 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.task.stage;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.core.connector.manager.PipeConnectorSubtaskManager;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -61,7 +61,7 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
     PipeConnectorSubtaskManager.instance().deregister(connectorSubtaskId);
   }
 
-  public ListenableBoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
+  public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
     return PipeConnectorSubtaskManager.instance().getPipeConnectorPendingQueue(connectorSubtaskId);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 40a1b7d7be0..c0bad32a3f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -20,15 +20,14 @@
 package org.apache.iotdb.db.pipe.task.stage;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
 import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
+import org.apache.iotdb.db.pipe.task.queue.BlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -48,8 +47,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
   protected final PipeProcessor pipeProcessor;
   protected final PipeProcessorSubtask pipeProcessorSubtask;
 
-  protected final ListenableBlockingPendingQueue<Event> pipeCollectorInputPendingQueue;
-  protected final ListenableBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue;
+  protected final BlockingPendingQueue<Event> pipeCollectorInputPendingQueue;
+  protected final BlockingPendingQueue<Event> pipeConnectorOutputPendingQueue;
 
   /**
    * @param pipeName pipe name
@@ -66,9 +65,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
       TConsensusGroupId dataRegionId,
       PipeTaskMeta taskMeta,
       EventSupplier pipeCollectorInputEventSupplier,
-      @Nullable ListenableBlockingPendingQueue<Event> pipeCollectorInputPendingQueue,
+      @Nullable BlockingPendingQueue<Event> pipeCollectorInputPendingQueue,
       PipeParameters pipeProcessorParameters,
-      ListenableBoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
+      BoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
     this.pipeProcessorParameters = pipeProcessorParameters;
 
     final String taskId = pipeName + "_" + dataRegionId;
@@ -84,39 +83,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
             pipeProcessor,
             pipeConnectorOutputEventCollector);
 
-    final PipeTaskStage pipeTaskStage = this;
-    this.pipeCollectorInputPendingQueue =
-        pipeCollectorInputPendingQueue != null
-            ? pipeCollectorInputPendingQueue
-                .registerEmptyToNotEmptyListener(
-                    taskId,
-                    () -> {
-                      // status can be changed by other threads calling pipeTaskStage's methods
-                      synchronized (pipeTaskStage) {
-                        if (status == PipeStatus.RUNNING) {
-                          executor.start(pipeProcessorSubtask.getTaskID());
-                        }
-                      }
-                    })
-                .registerNotEmptyToEmptyListener(
-                    taskId, () -> executor.stop(pipeProcessorSubtask.getTaskID()))
-            : null;
-    this.pipeConnectorOutputPendingQueue =
-        pipeConnectorOutputPendingQueue
-            .registerNotFullToFullListener(
-                taskId, () -> executor.stop(pipeProcessorSubtask.getTaskID()))
-            .registerFullToNotFullListener(
-                taskId,
-                () -> {
-                  // status can be changed by other threads calling pipeTaskStage's methods
-                  synchronized (pipeTaskStage) {
-                    // only start when the pipe is running
-                    if (status == PipeStatus.RUNNING) {
-                      pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
-                      executor.start(pipeProcessorSubtask.getTaskID());
-                    }
-                  }
-                });
+    this.pipeCollectorInputPendingQueue = pipeCollectorInputPendingQueue;
+    this.pipeConnectorOutputPendingQueue = pipeConnectorOutputPendingQueue;
   }
 
   @Override
@@ -149,16 +117,6 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
 
   @Override
   public void dropSubtask() throws PipeException {
-    final String taskId = pipeProcessorSubtask.getTaskID();
-
-    if (pipeCollectorInputPendingQueue != null) {
-      pipeCollectorInputPendingQueue.removeEmptyToNotEmptyListener(taskId);
-      pipeCollectorInputPendingQueue.removeNotEmptyToEmptyListener(taskId);
-    }
-
-    pipeConnectorOutputPendingQueue.removeNotFullToFullListener(taskId);
-    pipeConnectorOutputPendingQueue.removeFullToNotFullListener(taskId);
-
-    executor.deregister(taskId);
+    executor.deregister(pipeProcessorSubtask.getTaskID());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 84a4817d733..12802d88e5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.task.subtask;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -39,7 +39,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeConnectorSubtask.class);
 
-  private final ListenableBoundedBlockingPendingQueue<Event> inputPendingQueue;
+  private final BoundedBlockingPendingQueue<Event> inputPendingQueue;
   private final PipeConnector outputPipeConnector;
 
   private static final int HEARTBEAT_CHECK_INTERVAL = 1000;
@@ -49,7 +49,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
   public PipeConnectorSubtask(
       String taskID,
       PipeTaskMeta taskMeta,
-      ListenableBoundedBlockingPendingQueue<Event> inputPendingQueue,
+      BoundedBlockingPendingQueue<Event> inputPendingQueue,
       PipeConnector outputPipeConnector) {
     super(taskID, taskMeta);
     this.inputPendingQueue = inputPendingQueue;
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 27d28114007..d071c1ae5ef 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
-import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -102,17 +102,14 @@ public class PipeRealtimeCollectTest {
     // set up realtime collector
 
     try (PipeRealtimeDataRegionHybridCollector collector1 =
-            new PipeRealtimeDataRegionHybridCollector(
-                null, new ListenableUnboundedBlockingPendingQueue<>());
+            new PipeRealtimeDataRegionHybridCollector(null, new UnboundedBlockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector2 =
-            new PipeRealtimeDataRegionHybridCollector(
-                null, new ListenableUnboundedBlockingPendingQueue<>());
+            new PipeRealtimeDataRegionHybridCollector(null, new UnboundedBlockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector3 =
-            new PipeRealtimeDataRegionHybridCollector(
-                null, new ListenableUnboundedBlockingPendingQueue<>());
+            new PipeRealtimeDataRegionHybridCollector(null, new UnboundedBlockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector4 =
             new PipeRealtimeDataRegionHybridCollector(
-                null, new ListenableUnboundedBlockingPendingQueue<>())) {
+                null, new UnboundedBlockingPendingQueue<>())) {
 
       collector1.customize(
           new PipeParameters(
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
index 43d1f2eefb8..c01d6f51b39 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.execution.executor;
 
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
 
@@ -40,7 +40,7 @@ public class PipeConnectorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
             new PipeConnectorSubtask(
                 "PipeConnectorSubtaskExecutorTest",
                 mock(PipeTaskMeta.class),
-                mock(ListenableBoundedBlockingPendingQueue.class),
+                mock(BoundedBlockingPendingQueue.class),
                 mock(PipeConnector.class)));
   }
 }