You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/31 08:05:27 UTC
[iotdb] 01/03: remove empty listeners
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch reduce-submit-self
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 71e71bc8f99822f6a3c1d39142bd59e9acc4102f
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed May 31 12:34:01 2023 +0800
remove empty listeners
---
.../manager/PipeConnectorSubtaskLifeCycle.java | 10 --------
.../task/queue/ListenableBlockingPendingQueue.java | 2 ++
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 29 +++++++++-------------
3 files changed, 14 insertions(+), 27 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
index 8827bbb8d59..bec125eae9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
@@ -41,16 +41,6 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
this.subtask = subtask;
this.pendingQueue = pendingQueue;
- pendingQueue.registerEmptyToNotEmptyListener(
- subtask.getTaskID(),
- () -> {
- if (hasRunningTasks()) {
- executor.start(subtask.getTaskID());
- }
- });
- this.pendingQueue.registerNotEmptyToEmptyListener(
- subtask.getTaskID(), () -> executor.stop(subtask.getTaskID()));
-
runningTaskCount = 0;
aliveTaskCount = 0;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
index 265bdf09234..8751ffc3f52 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
@@ -102,6 +102,7 @@ public abstract class ListenableBlockingPendingQueue<E extends Event> {
fullToNotFullListeners
.values()
.forEach(PendingQueueFullToNotFullListener::onPendingQueueFullToNotFull);
+ LOGGER.info("notifyFullToNotFullListeners");
}
public ListenableBlockingPendingQueue<E> registerNotFullToFullListener(
@@ -118,6 +119,7 @@ public abstract class ListenableBlockingPendingQueue<E extends Event> {
notFullToFullListeners
.values()
.forEach(PendingQueueNotFullToFullListener::onPendingQueueNotFullToFull);
+ LOGGER.info("notifyNotFullToFullListeners");
}
public boolean offer(E event) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 40a1b7d7be0..2e06808a8d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -37,10 +37,15 @@ import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfig
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
public class PipeTaskProcessorStage extends PipeTaskStage {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskProcessorStage.class);
+
protected final PipeProcessorSubtaskExecutor executor =
PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor();
@@ -85,26 +90,15 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
pipeConnectorOutputEventCollector);
final PipeTaskStage pipeTaskStage = this;
- this.pipeCollectorInputPendingQueue =
- pipeCollectorInputPendingQueue != null
- ? pipeCollectorInputPendingQueue
- .registerEmptyToNotEmptyListener(
- taskId,
- () -> {
- // status can be changed by other threads calling pipeTaskStage's methods
- synchronized (pipeTaskStage) {
- if (status == PipeStatus.RUNNING) {
- executor.start(pipeProcessorSubtask.getTaskID());
- }
- }
- })
- .registerNotEmptyToEmptyListener(
- taskId, () -> executor.stop(pipeProcessorSubtask.getTaskID()))
- : null;
+ this.pipeCollectorInputPendingQueue = pipeCollectorInputPendingQueue;
this.pipeConnectorOutputPendingQueue =
pipeConnectorOutputPendingQueue
.registerNotFullToFullListener(
- taskId, () -> executor.stop(pipeProcessorSubtask.getTaskID()))
+ taskId,
+ () -> {
+ executor.stop(pipeProcessorSubtask.getTaskID());
+ LOGGER.warn("NotFullToFullListener", new Exception());
+ })
.registerFullToNotFullListener(
taskId,
() -> {
@@ -114,6 +108,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
if (status == PipeStatus.RUNNING) {
pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
executor.start(pipeProcessorSubtask.getTaskID());
+ LOGGER.warn("FullToNotFullListener", new Exception());
}
}
});