You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/31 08:05:27 UTC

[iotdb] 01/03: remove empty listeners

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch reduce-submit-self
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 71e71bc8f99822f6a3c1d39142bd59e9acc4102f
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed May 31 12:34:01 2023 +0800

    remove empty listeners
---
 .../manager/PipeConnectorSubtaskLifeCycle.java     | 10 --------
 .../task/queue/ListenableBlockingPendingQueue.java |  2 ++
 .../db/pipe/task/stage/PipeTaskProcessorStage.java | 29 +++++++++-------------
 3 files changed, 14 insertions(+), 27 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
index 8827bbb8d59..bec125eae9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
@@ -41,16 +41,6 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
     this.subtask = subtask;
     this.pendingQueue = pendingQueue;
 
-    pendingQueue.registerEmptyToNotEmptyListener(
-        subtask.getTaskID(),
-        () -> {
-          if (hasRunningTasks()) {
-            executor.start(subtask.getTaskID());
-          }
-        });
-    this.pendingQueue.registerNotEmptyToEmptyListener(
-        subtask.getTaskID(), () -> executor.stop(subtask.getTaskID()));
-
     runningTaskCount = 0;
     aliveTaskCount = 0;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
index 265bdf09234..8751ffc3f52 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
@@ -102,6 +102,7 @@ public abstract class ListenableBlockingPendingQueue<E extends Event> {
     fullToNotFullListeners
         .values()
         .forEach(PendingQueueFullToNotFullListener::onPendingQueueFullToNotFull);
+    LOGGER.info("notifyFullToNotFullListeners");
   }
 
   public ListenableBlockingPendingQueue<E> registerNotFullToFullListener(
@@ -118,6 +119,7 @@ public abstract class ListenableBlockingPendingQueue<E extends Event> {
     notFullToFullListeners
         .values()
         .forEach(PendingQueueNotFullToFullListener::onPendingQueueNotFullToFull);
+    LOGGER.info("notifyNotFullToFullListeners");
   }
 
   public boolean offer(E event) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 40a1b7d7be0..2e06808a8d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -37,10 +37,15 @@ import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfig
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 public class PipeTaskProcessorStage extends PipeTaskStage {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskProcessorStage.class);
+
   protected final PipeProcessorSubtaskExecutor executor =
       PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor();
 
@@ -85,26 +90,15 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
             pipeConnectorOutputEventCollector);
 
     final PipeTaskStage pipeTaskStage = this;
-    this.pipeCollectorInputPendingQueue =
-        pipeCollectorInputPendingQueue != null
-            ? pipeCollectorInputPendingQueue
-                .registerEmptyToNotEmptyListener(
-                    taskId,
-                    () -> {
-                      // status can be changed by other threads calling pipeTaskStage's methods
-                      synchronized (pipeTaskStage) {
-                        if (status == PipeStatus.RUNNING) {
-                          executor.start(pipeProcessorSubtask.getTaskID());
-                        }
-                      }
-                    })
-                .registerNotEmptyToEmptyListener(
-                    taskId, () -> executor.stop(pipeProcessorSubtask.getTaskID()))
-            : null;
+    this.pipeCollectorInputPendingQueue = pipeCollectorInputPendingQueue;
     this.pipeConnectorOutputPendingQueue =
         pipeConnectorOutputPendingQueue
             .registerNotFullToFullListener(
-                taskId, () -> executor.stop(pipeProcessorSubtask.getTaskID()))
+                taskId,
+                () -> {
+                  executor.stop(pipeProcessorSubtask.getTaskID());
+                  LOGGER.warn("NotFullToFullListener", new Exception());
+                })
             .registerFullToNotFullListener(
                 taskId,
                 () -> {
@@ -114,6 +108,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
                     if (status == PipeStatus.RUNNING) {
                       pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
                       executor.start(pipeProcessorSubtask.getTaskID());
+                      LOGGER.warn("FullToNotFullListener", new Exception());
                     }
                   }
                 });