You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/31 08:05:26 UTC

[iotdb] branch reduce-submit-self created (now 5813c76c3d0)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch reduce-submit-self
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 5813c76c3d0 ArrayBlockingQueue

This branch includes the following new commits:

     new 71e71bc8f99 remove empty listeners
     new 6714acb68ae remove listenable logic
     new 5813c76c3d0 ArrayBlockingQueue

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 03/03: ArrayBlockingQueue

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch reduce-submit-self
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5813c76c3d0c905f188c2c7a77aebd6e4d7e2ba6
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed May 31 16:05:00 2023 +0800

    ArrayBlockingQueue
---
 .../db/pipe/task/queue/ListenableBoundedBlockingPendingQueue.java     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBoundedBlockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBoundedBlockingPendingQueue.java
index da91cc4187b..1614f738254 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBoundedBlockingPendingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBoundedBlockingPendingQueue.java
@@ -21,12 +21,12 @@ package org.apache.iotdb.db.pipe.task.queue;
 
 import org.apache.iotdb.pipe.api.event.Event;
 
-import org.eclipse.jetty.util.BlockingArrayQueue;
+import java.util.concurrent.ArrayBlockingQueue;
 
 public class ListenableBoundedBlockingPendingQueue<E extends Event>
     extends ListenableBlockingPendingQueue<E> {
 
   public ListenableBoundedBlockingPendingQueue(int pendingQueueSize) {
-    super(new BlockingArrayQueue<>(pendingQueueSize));
+    super(new ArrayBlockingQueue<>(pendingQueueSize));
   }
 }


[iotdb] 01/03: remove empty listeners

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch reduce-submit-self
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 71e71bc8f99822f6a3c1d39142bd59e9acc4102f
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed May 31 12:34:01 2023 +0800

    remove empty listeners
---
 .../manager/PipeConnectorSubtaskLifeCycle.java     | 10 --------
 .../task/queue/ListenableBlockingPendingQueue.java |  2 ++
 .../db/pipe/task/stage/PipeTaskProcessorStage.java | 29 +++++++++-------------
 3 files changed, 14 insertions(+), 27 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
index 8827bbb8d59..bec125eae9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
@@ -41,16 +41,6 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
     this.subtask = subtask;
     this.pendingQueue = pendingQueue;
 
-    pendingQueue.registerEmptyToNotEmptyListener(
-        subtask.getTaskID(),
-        () -> {
-          if (hasRunningTasks()) {
-            executor.start(subtask.getTaskID());
-          }
-        });
-    this.pendingQueue.registerNotEmptyToEmptyListener(
-        subtask.getTaskID(), () -> executor.stop(subtask.getTaskID()));
-
     runningTaskCount = 0;
     aliveTaskCount = 0;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
index 265bdf09234..8751ffc3f52 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
@@ -102,6 +102,7 @@ public abstract class ListenableBlockingPendingQueue<E extends Event> {
     fullToNotFullListeners
         .values()
         .forEach(PendingQueueFullToNotFullListener::onPendingQueueFullToNotFull);
+    LOGGER.info("notifyFullToNotFullListeners");
   }
 
   public ListenableBlockingPendingQueue<E> registerNotFullToFullListener(
@@ -118,6 +119,7 @@ public abstract class ListenableBlockingPendingQueue<E extends Event> {
     notFullToFullListeners
         .values()
         .forEach(PendingQueueNotFullToFullListener::onPendingQueueNotFullToFull);
+    LOGGER.info("notifyNotFullToFullListeners");
   }
 
   public boolean offer(E event) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 40a1b7d7be0..2e06808a8d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -37,10 +37,15 @@ import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfig
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 public class PipeTaskProcessorStage extends PipeTaskStage {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskProcessorStage.class);
+
   protected final PipeProcessorSubtaskExecutor executor =
       PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor();
 
@@ -85,26 +90,15 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
             pipeConnectorOutputEventCollector);
 
     final PipeTaskStage pipeTaskStage = this;
-    this.pipeCollectorInputPendingQueue =
-        pipeCollectorInputPendingQueue != null
-            ? pipeCollectorInputPendingQueue
-                .registerEmptyToNotEmptyListener(
-                    taskId,
-                    () -> {
-                      // status can be changed by other threads calling pipeTaskStage's methods
-                      synchronized (pipeTaskStage) {
-                        if (status == PipeStatus.RUNNING) {
-                          executor.start(pipeProcessorSubtask.getTaskID());
-                        }
-                      }
-                    })
-                .registerNotEmptyToEmptyListener(
-                    taskId, () -> executor.stop(pipeProcessorSubtask.getTaskID()))
-            : null;
+    this.pipeCollectorInputPendingQueue = pipeCollectorInputPendingQueue;
     this.pipeConnectorOutputPendingQueue =
         pipeConnectorOutputPendingQueue
             .registerNotFullToFullListener(
-                taskId, () -> executor.stop(pipeProcessorSubtask.getTaskID()))
+                taskId,
+                () -> {
+                  executor.stop(pipeProcessorSubtask.getTaskID());
+                  LOGGER.warn("NotFullToFullListener", new Exception());
+                })
             .registerFullToNotFullListener(
                 taskId,
                 () -> {
@@ -114,6 +108,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
                     if (status == PipeStatus.RUNNING) {
                       pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
                       executor.start(pipeProcessorSubtask.getTaskID());
+                      LOGGER.warn("FullToNotFullListener", new Exception());
                     }
                   }
                 });


[iotdb] 02/03: remove listenable logic

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch reduce-submit-self
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6714acb68aed5e37e09ba165a4722145e1af40bf
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed May 31 15:19:58 2023 +0800

    remove listenable logic
---
 .../PipeRealtimeDataRegionHybridCollector.java     |  13 ++-
 .../PipeRealtimeDataRegionLogCollector.java        |   4 +-
 .../PipeRealtimeDataRegionTsFileCollector.java     |   4 +-
 .../manager/PipeConnectorSubtaskLifeCycle.java     |   7 --
 .../event/view/collector/PipeEventCollector.java   |  11 --
 .../task/queue/ListenableBlockingPendingQueue.java | 116 ++-------------------
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |  41 +-------
 7 files changed, 19 insertions(+), 177 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 872b9c56f07..fbed165f7a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -39,7 +39,6 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionHybridCollector.class);
 
-  // TODO: memory control
   // This queue is used to store pending events collected by the method collect(). The method
   // supply() will poll events from this queue and send them to the next pipe plugin.
   private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
@@ -86,7 +85,14 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
     }
 
     if (!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) {
-      pendingQueue.offer(event);
+      if (!pendingQueue.offer(event)) {
+        LOGGER.warn(
+            String.format(
+                "collectTabletInsertion: pending queue of PipeRealtimeDataRegionHybridCollector %s has reached capacity, discard tablet event %s, current state %s",
+                this, event, event.getTsFileEpoch().getState(this)));
+        // this would not happen, but just in case.
+        // UnboundedBlockingPendingQueue is unbounded, so it should never reach capacity.
+      }
     }
   }
 
@@ -101,11 +107,10 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
     if (!pendingQueue.offer(event)) {
       LOGGER.warn(
           String.format(
-              "Pending Queue of Hybrid Realtime Collector %s has reached capacity, discard TsFile Event %s, current state %s",
+              "collectTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridCollector %s has reached capacity, discard TsFile event %s, current state %s",
               this, event, event.getTsFileEpoch().getState(this)));
       // this would not happen, but just in case.
       // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
-      // TODO: memory control when elements in queue are too many.
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
index c7c96b650e4..99432deca13 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
@@ -36,7 +36,6 @@ public class PipeRealtimeDataRegionLogCollector extends PipeRealtimeDataRegionCo
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionLogCollector.class);
 
-  // TODO: memory control
   // This queue is used to store pending events collected by the method collect(). The method
   // supply() will poll events from this queue and send them to the next pipe plugin.
   private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
@@ -58,11 +57,10 @@ public class PipeRealtimeDataRegionLogCollector extends PipeRealtimeDataRegionCo
     if (!pendingQueue.offer(event)) {
       LOGGER.warn(
           String.format(
-              "Pending Queue of Log Realtime Collector %s has reached capacity, discard Tablet Event %s, current state %s",
+              "collect: pending queue of PipeRealtimeDataRegionLogCollector %s has reached capacity, discard tablet event %s, current state %s",
               this, event, event.getTsFileEpoch().getState(this)));
       // this would not happen, but just in case.
       // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
-      // TODO: memory control when elements in queue are too many.
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
index 42bec421eed..214c616441c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
@@ -36,7 +36,6 @@ public class PipeRealtimeDataRegionTsFileCollector extends PipeRealtimeDataRegio
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionTsFileCollector.class);
 
-  // TODO: memory control
   // This queue is used to store pending events collected by the method collect(). The method
   // supply() will poll events from this queue and send them to the next pipe plugin.
   private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
@@ -58,11 +57,10 @@ public class PipeRealtimeDataRegionTsFileCollector extends PipeRealtimeDataRegio
     if (!pendingQueue.offer(event)) {
       LOGGER.warn(
           String.format(
-              "Pending Queue of TsFile Realtime Collector %s has reached capacity, discard TsFile Event %s, current state %s",
+              "collect: pending queue of PipeRealtimeDataRegionTsFileCollector %s has reached capacity, discard TsFile event %s, current state %s",
               this, event, event.getTsFileEpoch().getState(this)));
       // this would not happen, but just in case.
       // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
-      // TODO: memory control when elements in queue are too many.
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
index bec125eae9e..ce930159c52 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
@@ -103,13 +103,6 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
 
   @Override
   public synchronized void close() {
-    pendingQueue.removeEmptyToNotEmptyListener(subtask.getTaskID());
-    pendingQueue.removeNotEmptyToEmptyListener(subtask.getTaskID());
-
     executor.deregister(subtask.getTaskID());
   }
-
-  private synchronized boolean hasRunningTasks() {
-    return runningTaskCount > 0;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index 6f2a7d82591..e39c16077a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -64,15 +64,4 @@ public class PipeEventCollector implements EventCollector {
       bufferQueue.offer(event);
     }
   }
-
-  public synchronized void tryCollectBufferedEvents() {
-    while (!bufferQueue.isEmpty()) {
-      final Event bufferedEvent = bufferQueue.peek();
-      if (pendingQueue.offer(bufferedEvent)) {
-        bufferQueue.poll();
-      } else {
-        return;
-      }
-    }
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
index 8751ffc3f52..04a89de9302 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
@@ -25,11 +25,8 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 public abstract class ListenableBlockingPendingQueue<E extends Event> {
 
@@ -41,109 +38,22 @@ public abstract class ListenableBlockingPendingQueue<E extends Event> {
 
   private final BlockingQueue<E> pendingQueue;
 
-  private final Map<String, PendingQueueEmptyToNotEmptyListener> emptyToNotEmptyListeners =
-      new ConcurrentHashMap<>();
-  private final Map<String, PendingQueueNotEmptyToEmptyListener> notEmptyToEmptyListeners =
-      new ConcurrentHashMap<>();
-  private final Map<String, PendingQueueFullToNotFullListener> fullToNotFullListeners =
-      new ConcurrentHashMap<>();
-  private final Map<String, PendingQueueNotFullToFullListener> notFullToFullListeners =
-      new ConcurrentHashMap<>();
-
-  private final AtomicBoolean isFull = new AtomicBoolean(false);
-
   protected ListenableBlockingPendingQueue(BlockingQueue<E> pendingQueue) {
     this.pendingQueue = pendingQueue;
   }
 
-  public ListenableBlockingPendingQueue<E> registerEmptyToNotEmptyListener(
-      String id, PendingQueueEmptyToNotEmptyListener listener) {
-    emptyToNotEmptyListeners.put(id, listener);
-    return this;
-  }
-
-  public void removeEmptyToNotEmptyListener(String id) {
-    emptyToNotEmptyListeners.remove(id);
-  }
-
-  public void notifyEmptyToNotEmptyListeners() {
-    emptyToNotEmptyListeners
-        .values()
-        .forEach(PendingQueueEmptyToNotEmptyListener::onPendingQueueEmptyToNotEmpty);
-  }
-
-  public ListenableBlockingPendingQueue<E> registerNotEmptyToEmptyListener(
-      String id, PendingQueueNotEmptyToEmptyListener listener) {
-    notEmptyToEmptyListeners.put(id, listener);
-    return this;
-  }
-
-  public void removeNotEmptyToEmptyListener(String id) {
-    notEmptyToEmptyListeners.remove(id);
-  }
-
-  public void notifyNotEmptyToEmptyListeners() {
-    notEmptyToEmptyListeners
-        .values()
-        .forEach(PendingQueueNotEmptyToEmptyListener::onPendingQueueNotEmptyToEmpty);
-  }
-
-  public ListenableBlockingPendingQueue<E> registerFullToNotFullListener(
-      String id, PendingQueueFullToNotFullListener listener) {
-    fullToNotFullListeners.put(id, listener);
-    return this;
-  }
-
-  public void removeFullToNotFullListener(String id) {
-    fullToNotFullListeners.remove(id);
-  }
-
-  public void notifyFullToNotFullListeners() {
-    fullToNotFullListeners
-        .values()
-        .forEach(PendingQueueFullToNotFullListener::onPendingQueueFullToNotFull);
-    LOGGER.info("notifyFullToNotFullListeners");
-  }
-
-  public ListenableBlockingPendingQueue<E> registerNotFullToFullListener(
-      String id, PendingQueueNotFullToFullListener listener) {
-    notFullToFullListeners.put(id, listener);
-    return this;
-  }
-
-  public void removeNotFullToFullListener(String id) {
-    notFullToFullListeners.remove(id);
-  }
-
-  public void notifyNotFullToFullListeners() {
-    notFullToFullListeners
-        .values()
-        .forEach(PendingQueueNotFullToFullListener::onPendingQueueNotFullToFull);
-    LOGGER.info("notifyNotFullToFullListeners");
-  }
-
   public boolean offer(E event) {
-    final boolean isEmpty = pendingQueue.isEmpty();
-    final boolean isAdded = pendingQueue.offer(event);
-
-    if (isAdded) {
-      // we don't use size() == 1 to check whether the listener should be called,
-      // because offer() and size() are not atomic, and we don't want to use lock
-      // to make them atomic.
-      if (isEmpty) {
-        notifyEmptyToNotEmptyListeners();
-      }
-    } else {
-      if (isFull.compareAndSet(false, true)) {
-        notifyNotFullToFullListeners();
-      }
+    boolean isAdded = false;
+    try {
+      isAdded = pendingQueue.offer(event, MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.info("pending queue offer is interrupted.", e);
+      Thread.currentThread().interrupt();
     }
-
     return isAdded;
   }
 
   public E poll() {
-    final boolean isEmpty = pendingQueue.isEmpty();
     E event = null;
     try {
       event = pendingQueue.poll(MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
@@ -151,20 +61,6 @@ public abstract class ListenableBlockingPendingQueue<E extends Event> {
       LOGGER.info("pending queue poll is interrupted.", e);
       Thread.currentThread().interrupt();
     }
-
-    if (event == null) {
-      // we don't use size() == 0 to check whether the listener should be called,
-      // because poll() and size() are not atomic, and we don't want to use lock
-      // to make them atomic.
-      if (!isEmpty) {
-        notifyNotEmptyToEmptyListeners();
-      }
-    } else {
-      if (isFull.compareAndSet(true, false)) {
-        notifyFullToNotFullListeners();
-      }
-    }
-
     return event;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 2e06808a8d3..04eedc94d4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.pipe.task.stage;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
@@ -37,15 +36,10 @@ import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfig
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.Nullable;
 
 public class PipeTaskProcessorStage extends PipeTaskStage {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskProcessorStage.class);
-
   protected final PipeProcessorSubtaskExecutor executor =
       PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor();
 
@@ -89,29 +83,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
             pipeProcessor,
             pipeConnectorOutputEventCollector);
 
-    final PipeTaskStage pipeTaskStage = this;
     this.pipeCollectorInputPendingQueue = pipeCollectorInputPendingQueue;
-    this.pipeConnectorOutputPendingQueue =
-        pipeConnectorOutputPendingQueue
-            .registerNotFullToFullListener(
-                taskId,
-                () -> {
-                  executor.stop(pipeProcessorSubtask.getTaskID());
-                  LOGGER.warn("NotFullToFullListener", new Exception());
-                })
-            .registerFullToNotFullListener(
-                taskId,
-                () -> {
-                  // status can be changed by other threads calling pipeTaskStage's methods
-                  synchronized (pipeTaskStage) {
-                    // only start when the pipe is running
-                    if (status == PipeStatus.RUNNING) {
-                      pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
-                      executor.start(pipeProcessorSubtask.getTaskID());
-                      LOGGER.warn("FullToNotFullListener", new Exception());
-                    }
-                  }
-                });
+    this.pipeConnectorOutputPendingQueue = pipeConnectorOutputPendingQueue;
   }
 
   @Override
@@ -144,16 +117,6 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
 
   @Override
   public void dropSubtask() throws PipeException {
-    final String taskId = pipeProcessorSubtask.getTaskID();
-
-    if (pipeCollectorInputPendingQueue != null) {
-      pipeCollectorInputPendingQueue.removeEmptyToNotEmptyListener(taskId);
-      pipeCollectorInputPendingQueue.removeNotEmptyToEmptyListener(taskId);
-    }
-
-    pipeConnectorOutputPendingQueue.removeNotFullToFullListener(taskId);
-    pipeConnectorOutputPendingQueue.removeFullToNotFullListener(taskId);
-
-    executor.deregister(taskId);
+    executor.deregister(pipeProcessorSubtask.getTaskID());
   }
 }