You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/09 16:55:11 UTC

[iotdb] 08/08: ListenablePendingQueue

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5848
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2f29861d7a90f8501682201ee19dc3e82bf0c429
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed May 10 00:51:46 2023 +0800

    ListenablePendingQueue
---
 .../core/collector/IoTDBDataRegionCollector.java   |  4 +--
 .../PipeRealtimeDataRegionHybridCollector.java     | 13 +++++-----
 .../connector/PipeConnectorSubtaskLifeCycle.java   | 30 +++++++++++-----------
 .../connector/PipeConnectorSubtaskManager.java     |  8 +++---
 .../event/view/collector/PipeEventCollector.java   |  6 ++---
 .../pipe/task/{binder => queue}/EventSupplier.java |  2 +-
 .../ListenableBlockingPendingQueue.java}           | 13 +++++++---
 .../ListenablePendingQueue.java}                   | 22 ++++++----------
 .../ListenableUnblockingPendingQueue.java}         | 13 +++++++---
 .../PendingQueueEmptyToNotEmptyListener.java       |  2 +-
 .../PendingQueueFullToNotFullListener.java         |  2 +-
 .../PendingQueueNotEmptyToEmptyListener.java       |  2 +-
 .../PendingQueueNotFullToFullListener.java         |  2 +-
 .../db/pipe/task/stage/PipeTaskCollectorStage.java | 10 +++-----
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |  4 +--
 .../db/pipe/task/stage/PipeTaskProcessorStage.java | 13 +++++-----
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |  8 +++---
 .../db/pipe/task/subtask/PipeProcessorSubtask.java |  2 +-
 .../core/collector/PipeRealtimeCollectTest.java    |  9 ++++---
 .../executor/PipeConnectorSubtaskExecutorTest.java |  4 +--
 .../executor/PipeProcessorSubtaskExecutorTest.java |  2 +-
 21 files changed, 90 insertions(+), 81 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index d5638d67a5..d7526fa187 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.core.collector;
 import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -39,7 +39,7 @@ public class IoTDBDataRegionCollector implements PipeCollector {
   // TODO: support pattern in historical collector
   private final PipeHistoricalDataRegionTsFileCollector historicalCollector;
 
-  public IoTDBDataRegionCollector(PendingQueue<Event> collectorPendingQueue) {
+  public IoTDBDataRegionCollector(ListenableUnblockingPendingQueue<Event> collectorPendingQueue) {
     hasBeenStarted = new AtomicBoolean(false);
     realtimeCollector = new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
     historicalCollector = new PipeHistoricalDataRegionTsFileCollector();
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 9115d91ef8..d8b79fbfcf 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.core.collector.realtime;
 import org.apache.iotdb.db.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import org.slf4j.Logger;
@@ -37,9 +37,10 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
   // TODO: memory control
   // This queue is used to store pending events collected by the method collect(). The method
   // supply() will poll events from this queue and send them to the next pipe plugin.
-  private final PendingQueue<Event> pendingQueue;
+  private final ListenableUnblockingPendingQueue<Event> pendingQueue;
 
-  public PipeRealtimeDataRegionHybridCollector(PendingQueue<Event> pendingQueue) {
+  public PipeRealtimeDataRegionHybridCollector(
+      ListenableUnblockingPendingQueue<Event> pendingQueue) {
     this.pendingQueue = pendingQueue;
   }
 
@@ -87,9 +88,9 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
           String.format(
               "Pending Queue of Hybrid Realtime Collector %s has reached capacity, discard TsFile Event %s, current state %s",
               this, event, event.getTsFileEpoch().getState(this)));
-      // TODO: more degradation strategies
-      // TODO: dynamic control of the pending queue capacity
-      // TODO: should be handled by the PipeRuntimeAgent
+      // this would not happen, but just in case.
+      // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
+      // TODO: memory control when elements in queue are too many.
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
index f794a7a7a7..9ed53ae310 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.core.connector;
 
 import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -28,7 +28,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
 
   private final PipeConnectorSubtaskExecutor executor;
   private final PipeConnectorSubtask subtask;
-  private final PendingQueue<Event> pendingQueue;
+  private final ListenableBlockingPendingQueue<Event> pendingQueue;
 
   private int runningTaskCount;
   private int aliveTaskCount;
@@ -36,20 +36,20 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
   public PipeConnectorSubtaskLifeCycle(
       PipeConnectorSubtaskExecutor executor,
       PipeConnectorSubtask subtask,
-      PendingQueue<Event> pendingQueue) {
+      ListenableBlockingPendingQueue<Event> pendingQueue) {
     this.executor = executor;
     this.subtask = subtask;
-    this.pendingQueue =
-        pendingQueue
-            .registerEmptyToNotEmptyListener(
-                subtask.getTaskID(),
-                () -> {
-                  if (hasRunningTasks()) {
-                    executor.start(subtask.getTaskID());
-                  }
-                })
-            .registerNotEmptyToEmptyListener(
-                subtask.getTaskID(), () -> executor.stop(subtask.getTaskID()));
+    this.pendingQueue = pendingQueue;
+
+    pendingQueue.registerEmptyToNotEmptyListener(
+        subtask.getTaskID(),
+        () -> {
+          if (hasRunningTasks()) {
+            executor.start(subtask.getTaskID());
+          }
+        });
+    this.pendingQueue.registerNotEmptyToEmptyListener(
+        subtask.getTaskID(), () -> executor.stop(subtask.getTaskID()));
 
     runningTaskCount = 0;
     aliveTaskCount = 0;
@@ -59,7 +59,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
     return subtask;
   }
 
-  public PendingQueue<Event> getPendingQueue() {
+  public ListenableBlockingPendingQueue<Event> getPendingQueue() {
     return pendingQueue;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
index feb2797022..070e4e05b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.core.connector;
 
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -45,7 +45,8 @@ public class PipeConnectorSubtaskManager {
     if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
       final PipeConnector pipeConnector = PipeAgent.plugin().reflectConnector(connectorAttributes);
       // TODO: make pendingQueue size configurable
-      final PendingQueue<Event> pendingQueue = new PendingQueue<>(1024 * 1024);
+      final ListenableBlockingPendingQueue<Event> pendingQueue =
+          new ListenableBlockingPendingQueue<>(65535);
       final PipeConnectorSubtask pipeConnectorSubtask =
           new PipeConnectorSubtask(attributeSortedString, pendingQueue, pipeConnector);
       final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
@@ -97,7 +98,8 @@ public class PipeConnectorSubtaskManager {
     return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getSubtask();
   }
 
-  public PendingQueue<Event> getPipeConnectorPendingQueue(String attributeSortedString) {
+  public ListenableBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
+      String attributeSortedString) {
     if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
       throw new PipeException(
           "Failed to get PendingQueue. No such subtask: " + attributeSortedString);
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index 01289395b8..0d1d60fdde 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.event.view.collector;
 
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
@@ -31,7 +31,7 @@ import java.util.Queue;
 
 public class PipeEventCollector implements EventCollector {
 
-  private final PendingQueue<Event> pendingQueue;
+  private final ListenableBlockingPendingQueue<Event> pendingQueue;
 
   // buffer queue is used to store events that are not offered to pending queue
   // because the pending queue is full. when pending queue is full, pending queue
@@ -41,7 +41,7 @@ public class PipeEventCollector implements EventCollector {
   // events before events in buffer queue are offered to pending queue.
   private final Queue<Event> bufferQueue;
 
-  public PipeEventCollector(PendingQueue<Event> pendingQueue) {
+  public PipeEventCollector(ListenableBlockingPendingQueue<Event> pendingQueue) {
     this.pendingQueue = pendingQueue;
     bufferQueue = new LinkedList<>();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/EventSupplier.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/EventSupplier.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/binder/EventSupplier.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/EventSupplier.java
index 01abb027d2..ea056dc22a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/EventSupplier.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/EventSupplier.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.binder;
+package org.apache.iotdb.db.pipe.task.queue;
 
 import org.apache.iotdb.pipe.api.event.Event;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
similarity index 69%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
index fe84f3ca4b..ab3090a0ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
@@ -17,10 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.binder;
+package org.apache.iotdb.db.pipe.task.queue;
 
-@FunctionalInterface
-public interface PendingQueueNotFullToFullListener {
+import org.apache.iotdb.pipe.api.event.Event;
 
-  void onPendingQueueNotFullToFull();
+import org.eclipse.jetty.util.BlockingArrayQueue;
+
+public class ListenableBlockingPendingQueue<E extends Event> extends ListenablePendingQueue<E> {
+
+  public ListenableBlockingPendingQueue(int pendingQueueSize) {
+    super(new BlockingArrayQueue<>(pendingQueueSize));
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenablePendingQueue.java
similarity index 88%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueue.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenablePendingQueue.java
index 1e123b7390..f476d88053 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenablePendingQueue.java
@@ -17,17 +17,16 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.binder;
+package org.apache.iotdb.db.pipe.task.queue;
 
 import org.apache.iotdb.pipe.api.event.Event;
 
 import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class PendingQueue<E extends Event> {
+public abstract class ListenablePendingQueue<E extends Event> {
 
   private final Queue<E> pendingQueue;
 
@@ -42,12 +41,11 @@ public class PendingQueue<E extends Event> {
 
   private final AtomicBoolean isFull = new AtomicBoolean(false);
 
-  public PendingQueue(int pendingQueueSize) {
-    // TODO: make the size of the queue size reasonable and configurable
-    this.pendingQueue = new ArrayBlockingQueue<>(pendingQueueSize);
+  protected ListenablePendingQueue(Queue<E> pendingQueue) {
+    this.pendingQueue = pendingQueue;
   }
 
-  public PendingQueue<E> registerEmptyToNotEmptyListener(
+  public ListenablePendingQueue<E> registerEmptyToNotEmptyListener(
       String id, PendingQueueEmptyToNotEmptyListener listener) {
     emptyToNotEmptyListeners.put(id, listener);
     return this;
@@ -63,7 +61,7 @@ public class PendingQueue<E extends Event> {
         .forEach(PendingQueueEmptyToNotEmptyListener::onPendingQueueEmptyToNotEmpty);
   }
 
-  public PendingQueue<E> registerNotEmptyToEmptyListener(
+  public ListenablePendingQueue<E> registerNotEmptyToEmptyListener(
       String id, PendingQueueNotEmptyToEmptyListener listener) {
     notEmptyToEmptyListeners.put(id, listener);
     return this;
@@ -79,7 +77,7 @@ public class PendingQueue<E extends Event> {
         .forEach(PendingQueueNotEmptyToEmptyListener::onPendingQueueNotEmptyToEmpty);
   }
 
-  public PendingQueue<E> registerFullToNotFullListener(
+  public ListenablePendingQueue<E> registerFullToNotFullListener(
       String id, PendingQueueFullToNotFullListener listener) {
     fullToNotFullListeners.put(id, listener);
     return this;
@@ -95,7 +93,7 @@ public class PendingQueue<E extends Event> {
         .forEach(PendingQueueFullToNotFullListener::onPendingQueueFullToNotFull);
   }
 
-  public PendingQueue<E> registerNotFullToFullListener(
+  public ListenablePendingQueue<E> registerNotFullToFullListener(
       String id, PendingQueueNotFullToFullListener listener) {
     notFullToFullListeners.put(id, listener);
     return this;
@@ -158,8 +156,4 @@ public class PendingQueue<E extends Event> {
   public int size() {
     return pendingQueue.size();
   }
-
-  public void disable() {
-    pendingQueue = null;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnblockingPendingQueue.java
similarity index 70%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnblockingPendingQueue.java
index fe84f3ca4b..c2772b4eeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnblockingPendingQueue.java
@@ -17,10 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.binder;
+package org.apache.iotdb.db.pipe.task.queue;
 
-@FunctionalInterface
-public interface PendingQueueNotFullToFullListener {
+import org.apache.iotdb.pipe.api.event.Event;
 
-  void onPendingQueueNotFullToFull();
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class ListenableUnblockingPendingQueue<E extends Event> extends ListenablePendingQueue<E> {
+
+  public ListenableUnblockingPendingQueue() {
+    super(new ConcurrentLinkedQueue<>());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueEmptyToNotEmptyListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueEmptyToNotEmptyListener.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java
index 6d855cdb46..d56b6e789e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueEmptyToNotEmptyListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.binder;
+package org.apache.iotdb.db.pipe.task.queue;
 
 @FunctionalInterface
 public interface PendingQueueEmptyToNotEmptyListener {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueFullToNotFullListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueFullToNotFullListener.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java
index 91b6192c2a..33225505f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueFullToNotFullListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.binder;
+package org.apache.iotdb.db.pipe.task.queue;
 
 @FunctionalInterface
 public interface PendingQueueFullToNotFullListener {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotEmptyToEmptyListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotEmptyToEmptyListener.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java
index 80b89045bf..4225783739 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotEmptyToEmptyListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.binder;
+package org.apache.iotdb.db.pipe.task.queue;
 
 @FunctionalInterface
 public interface PendingQueueNotEmptyToEmptyListener {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java
index fe84f3ca4b..2433cd4b8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.binder;
+package org.apache.iotdb.db.pipe.task.queue;
 
 @FunctionalInterface
 public interface PendingQueueNotFullToFullListener {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 770614cb6f..161890af40 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -22,9 +22,8 @@ package org.apache.iotdb.db.pipe.task.stage;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
-import org.apache.iotdb.db.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -45,7 +44,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
    * processing, and it also can notify the PipeTaskProcessorStage to start processing data when the
    * queue is not empty.
    */
-  private PendingQueue<Event> collectorPendingQueue;
+  private ListenableUnblockingPendingQueue<Event> collectorPendingQueue;
 
   private PipeCollector pipeCollector;
 
@@ -63,8 +62,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
             PipeCollectorConstant.COLLECTOR_KEY,
             BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName())
         .equals(BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName())) {
-      collectorPendingQueue =
-          new PendingQueue<>(PipeConfig.getInstance().getRealtimeCollectorPendingQueueCapacity());
+      collectorPendingQueue = new ListenableUnblockingPendingQueue<>();
       this.pipeCollector = new IoTDBDataRegionCollector(collectorPendingQueue);
     } else {
       this.pipeCollector = PipeAgent.plugin().reflectCollector(collectorParameters);
@@ -94,7 +92,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
     }
   }
 
-  public PendingQueue<Event> getCollectorPendingQueue() {
+  public ListenableUnblockingPendingQueue<Event> getCollectorPendingQueue() {
     return collectorPendingQueue;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index e385b1266f..560ee2ae86 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.task.stage;
 
 import org.apache.iotdb.db.pipe.core.connector.PipeConnectorSubtaskManager;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -60,7 +60,7 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
     PipeConnectorSubtaskManager.instance().deregister(connectorSubtaskId);
   }
 
-  public PendingQueue<Event> getPipeConnectorPendingQueue() {
+  public ListenableBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
     return PipeConnectorSubtaskManager.instance().getPipeConnectorPendingQueue(connectorSubtaskId);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 8aa112d04d..503e97c6e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -24,8 +24,9 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
 import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
-import org.apache.iotdb.db.pipe.task.binder.EventSupplier;
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenablePendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -41,8 +42,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
 
   protected final PipeProcessorSubtask subtask;
 
-  protected final PendingQueue<Event> pipeCollectorInputPendingQueue;
-  protected final PendingQueue<Event> pipeConnectorOutputPendingQueue;
+  protected final ListenablePendingQueue<Event> pipeCollectorInputPendingQueue;
+  protected final ListenablePendingQueue<Event> pipeConnectorOutputPendingQueue;
 
   /**
    * @param pipeName pipe name
@@ -57,9 +58,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
       String pipeName,
       String dataRegionId,
       EventSupplier pipeCollectorInputEventSupplier,
-      @Nullable PendingQueue<Event> pipeCollectorInputPendingQueue,
+      @Nullable ListenablePendingQueue<Event> pipeCollectorInputPendingQueue,
       PipeParameters pipeProcessorParameters,
-      PendingQueue<Event> pipeConnectorOutputPendingQueue) {
+      ListenableBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
     final String taskId = pipeName + "_" + dataRegionId;
     final PipeProcessor pipeProcessor =
         PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 48aa466bd8..5de09bdd8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.task.subtask;
 
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
@@ -37,12 +37,14 @@ public class PipeConnectorSubtask extends PipeSubtask {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeConnectorSubtask.class);
 
-  private final PendingQueue inputPendingQueue;
+  private final ListenableBlockingPendingQueue<Event> inputPendingQueue;
   private final PipeConnector outputPipeConnector;
 
   /** @param taskID connectorAttributeSortedString */
   public PipeConnectorSubtask(
-      String taskID, PendingQueue inputPendingQueue, PipeConnector outputPipeConnector) {
+      String taskID,
+      ListenableBlockingPendingQueue<Event> inputPendingQueue,
+      PipeConnector outputPipeConnector) {
     super(taskID);
     this.inputPendingQueue = inputPendingQueue;
     this.outputPipeConnector = outputPipeConnector;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index cc94de2bb7..3b7a59aa9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.task.subtask;
 
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
-import org.apache.iotdb.db.pipe.task.binder.EventSupplier;
+import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 46ce856b82..214b8b0774 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.EventType;
@@ -83,13 +84,13 @@ public class PipeRealtimeCollectTest {
     // set up realtime collector
 
     try (PipeRealtimeDataRegionHybridCollector collector1 =
-            new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
+            new PipeRealtimeDataRegionHybridCollector(new ListenableUnblockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector2 =
-            new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
+            new PipeRealtimeDataRegionHybridCollector(new ListenableUnblockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector3 =
-            new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
+            new PipeRealtimeDataRegionHybridCollector(new ListenableUnblockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector4 =
-            new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue)) {
+            new PipeRealtimeDataRegionHybridCollector(new ListenableUnblockingPendingQueue<>())) {
 
       collector1.customize(
           new PipeParameters(
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
index 517cf899a4..11007ebf3e 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
 
@@ -38,7 +38,7 @@ public class PipeConnectorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
         Mockito.spy(
             new PipeConnectorSubtask(
                 "PipeConnectorSubtaskExecutorTest",
-                mock(PendingQueue.class),
+                mock(ListenableBlockingPendingQueue.class),
                 mock(PipeConnector.class)) {
               @Override
               public void executeForAWhile() {}
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
index 88997a0ca7..027405d872 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
-import org.apache.iotdb.db.pipe.task.binder.EventSupplier;
+import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;