You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/31 09:40:19 UTC
[iotdb] branch reduce-submit-self updated: remove listenable logic
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch reduce-submit-self
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/reduce-submit-self by this push:
new 3372fd19736 remove listenable logic
3372fd19736 is described below
commit 3372fd19736cf8bd2350693c45ba34aefb2a9b10
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed May 31 17:40:03 2023 +0800
remove listenable logic
---
.../core/collector/IoTDBDataRegionCollector.java | 6 ++---
.../PipeRealtimeDataRegionHybridCollector.java | 6 ++---
.../PipeRealtimeDataRegionLogCollector.java | 6 ++---
.../PipeRealtimeDataRegionTsFileCollector.java | 6 ++---
.../manager/PipeConnectorSubtaskLifeCycle.java | 8 +++----
.../manager/PipeConnectorSubtaskManager.java | 8 +++----
.../event/view/collector/PipeEventCollector.java | 6 ++---
...PendingQueue.java => BlockingPendingQueue.java} | 7 +++---
...Queue.java => BoundedBlockingPendingQueue.java} | 5 ++---
.../queue/PendingQueueEmptyToNotEmptyListener.java | 26 ----------------------
.../queue/PendingQueueFullToNotFullListener.java | 26 ----------------------
.../queue/PendingQueueNotEmptyToEmptyListener.java | 26 ----------------------
.../queue/PendingQueueNotFullToFullListener.java | 26 ----------------------
...eue.java => UnboundedBlockingPendingQueue.java} | 5 ++---
.../db/pipe/task/stage/PipeTaskCollectorStage.java | 8 +++----
.../db/pipe/task/stage/PipeTaskConnectorStage.java | 4 ++--
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 12 +++++-----
.../db/pipe/task/subtask/PipeConnectorSubtask.java | 6 ++---
.../core/collector/PipeRealtimeCollectTest.java | 13 +++++------
.../executor/PipeConnectorSubtaskExecutorTest.java | 4 ++--
20 files changed, 52 insertions(+), 162 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index 37a9c5101a9..757599f2b24 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionFa
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionLogCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionTsFileCollector;
-import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeCollector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -59,7 +59,7 @@ public class IoTDBDataRegionCollector implements PipeCollector {
private final PipeTaskMeta pipeTaskMeta;
private final long creationTime;
- private final ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue;
+ private final UnboundedBlockingPendingQueue<Event> collectorPendingQueue;
// TODO: support pattern in historical collector
private PipeHistoricalDataRegionCollector historicalCollector;
@@ -70,7 +70,7 @@ public class IoTDBDataRegionCollector implements PipeCollector {
public IoTDBDataRegionCollector(
PipeTaskMeta pipeTaskMeta,
long creationTime,
- ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue) {
+ UnboundedBlockingPendingQueue<Event> collectorPendingQueue) {
this.hasBeenStarted = new AtomicBoolean(false);
this.pipeTaskMeta = pipeTaskMeta;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index fbed165f7a4..493240455aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
-import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -41,10 +41,10 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
// This queue is used to store pending events collected by the method collect(). The method
// supply() will poll events from this queue and send them to the next pipe plugin.
- private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
+ private final UnboundedBlockingPendingQueue<Event> pendingQueue;
public PipeRealtimeDataRegionHybridCollector(
- PipeTaskMeta pipeTaskMeta, ListenableUnboundedBlockingPendingQueue<Event> pendingQueue) {
+ PipeTaskMeta pipeTaskMeta, UnboundedBlockingPendingQueue<Event> pendingQueue) {
super(pipeTaskMeta);
this.pendingQueue = pendingQueue;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
index 99432deca13..e6835187cfc 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
-import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
@@ -38,10 +38,10 @@ public class PipeRealtimeDataRegionLogCollector extends PipeRealtimeDataRegionCo
// This queue is used to store pending events collected by the method collect(). The method
// supply() will poll events from this queue and send them to the next pipe plugin.
- private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
+ private final UnboundedBlockingPendingQueue<Event> pendingQueue;
public PipeRealtimeDataRegionLogCollector(
- PipeTaskMeta pipeTaskMeta, ListenableUnboundedBlockingPendingQueue<Event> pendingQueue) {
+ PipeTaskMeta pipeTaskMeta, UnboundedBlockingPendingQueue<Event> pendingQueue) {
super(pipeTaskMeta);
this.pendingQueue = pendingQueue;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
index 214c616441c..1b00fdd974e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
-import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
@@ -38,10 +38,10 @@ public class PipeRealtimeDataRegionTsFileCollector extends PipeRealtimeDataRegio
// This queue is used to store pending events collected by the method collect(). The method
// supply() will poll events from this queue and send them to the next pipe plugin.
- private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
+ private final UnboundedBlockingPendingQueue<Event> pendingQueue;
public PipeRealtimeDataRegionTsFileCollector(
- PipeTaskMeta pipeTaskMeta, ListenableUnboundedBlockingPendingQueue<Event> pendingQueue) {
+ PipeTaskMeta pipeTaskMeta, UnboundedBlockingPendingQueue<Event> pendingQueue) {
super(pipeTaskMeta);
this.pendingQueue = pendingQueue;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
index ce930159c52..6936e4610cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.pipe.core.connector.manager;
import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
import org.apache.iotdb.pipe.api.event.Event;
@@ -28,7 +28,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
private final PipeConnectorSubtaskExecutor executor;
private final PipeConnectorSubtask subtask;
- private final ListenableBoundedBlockingPendingQueue<Event> pendingQueue;
+ private final BoundedBlockingPendingQueue<Event> pendingQueue;
private int runningTaskCount;
private int aliveTaskCount;
@@ -36,7 +36,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
public PipeConnectorSubtaskLifeCycle(
PipeConnectorSubtaskExecutor executor,
PipeConnectorSubtask subtask,
- ListenableBoundedBlockingPendingQueue<Event> pendingQueue) {
+ BoundedBlockingPendingQueue<Event> pendingQueue) {
this.executor = executor;
this.subtask = subtask;
this.pendingQueue = pendingQueue;
@@ -49,7 +49,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
return subtask;
}
- public ListenableBoundedBlockingPendingQueue<Event> getPendingQueue() {
+ public BoundedBlockingPendingQueue<Event> getPendingQueue() {
return pendingQueue;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
index 87fd30b5a30..94b2975358c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.IoTDBThriftConnectorV1;
import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -77,8 +77,8 @@ public class PipeConnectorSubtaskManager {
}
// 2. construct PipeConnectorSubtaskLifeCycle to manage PipeConnectorSubtask's life cycle
- final ListenableBoundedBlockingPendingQueue<Event> pendingQueue =
- new ListenableBoundedBlockingPendingQueue<>(
+ final BoundedBlockingPendingQueue<Event> pendingQueue =
+ new BoundedBlockingPendingQueue<>(
PipeConfig.getInstance().getPipeConnectorPendingQueueSize());
final PipeConnectorSubtask pipeConnectorSubtask =
new PipeConnectorSubtask(attributeSortedString, taskMeta, pendingQueue, pipeConnector);
@@ -131,7 +131,7 @@ public class PipeConnectorSubtaskManager {
return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getSubtask();
}
- public ListenableBoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
+ public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
String attributeSortedString) {
if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
throw new PipeException(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index e39c16077a5..6a8697eec8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.pipe.core.event.view.collector;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
@@ -29,7 +29,7 @@ import java.util.Queue;
public class PipeEventCollector implements EventCollector {
- private final ListenableBoundedBlockingPendingQueue<Event> pendingQueue;
+ private final BoundedBlockingPendingQueue<Event> pendingQueue;
// buffer queue is used to store events that are not offered to pending queue
// because the pending queue is full. when pending queue is full, pending queue
@@ -39,7 +39,7 @@ public class PipeEventCollector implements EventCollector {
// events before events in buffer queue are offered to pending queue.
private final Queue<Event> bufferQueue;
- public PipeEventCollector(ListenableBoundedBlockingPendingQueue<Event> pendingQueue) {
+ public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue) {
this.pendingQueue = pendingQueue;
bufferQueue = new LinkedList<>();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/BlockingPendingQueue.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/BlockingPendingQueue.java
index 04a89de9302..07465e5fa36 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/BlockingPendingQueue.java
@@ -28,17 +28,16 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
-public abstract class ListenableBlockingPendingQueue<E extends Event> {
+public abstract class BlockingPendingQueue<E extends Event> {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(ListenableBlockingPendingQueue.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(BlockingPendingQueue.class);
private static final long MAX_BLOCKING_TIME_MS =
PipeConfig.getInstance().getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
private final BlockingQueue<E> pendingQueue;
- protected ListenableBlockingPendingQueue(BlockingQueue<E> pendingQueue) {
+ protected BlockingPendingQueue(BlockingQueue<E> pendingQueue) {
this.pendingQueue = pendingQueue;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBoundedBlockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/BoundedBlockingPendingQueue.java
similarity index 84%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBoundedBlockingPendingQueue.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/BoundedBlockingPendingQueue.java
index 1614f738254..0940fee2753 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBoundedBlockingPendingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/BoundedBlockingPendingQueue.java
@@ -23,10 +23,9 @@ import org.apache.iotdb.pipe.api.event.Event;
import java.util.concurrent.ArrayBlockingQueue;
-public class ListenableBoundedBlockingPendingQueue<E extends Event>
- extends ListenableBlockingPendingQueue<E> {
+public class BoundedBlockingPendingQueue<E extends Event> extends BlockingPendingQueue<E> {
- public ListenableBoundedBlockingPendingQueue(int pendingQueueSize) {
+ public BoundedBlockingPendingQueue(int pendingQueueSize) {
super(new ArrayBlockingQueue<>(pendingQueueSize));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java
deleted file mode 100644
index d56b6e789ee..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.task.queue;
-
-@FunctionalInterface
-public interface PendingQueueEmptyToNotEmptyListener {
-
- void onPendingQueueEmptyToNotEmpty();
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java
deleted file mode 100644
index 33225505f71..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.task.queue;
-
-@FunctionalInterface
-public interface PendingQueueFullToNotFullListener {
-
- void onPendingQueueFullToNotFull();
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java
deleted file mode 100644
index 42257837391..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.task.queue;
-
-@FunctionalInterface
-public interface PendingQueueNotEmptyToEmptyListener {
-
- void onPendingQueueNotEmptyToEmpty();
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java
deleted file mode 100644
index 2433cd4b8de..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.task.queue;
-
-@FunctionalInterface
-public interface PendingQueueNotFullToFullListener {
-
- void onPendingQueueNotFullToFull();
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnboundedBlockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/UnboundedBlockingPendingQueue.java
similarity index 85%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnboundedBlockingPendingQueue.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/UnboundedBlockingPendingQueue.java
index 63611067190..3a94393dd08 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnboundedBlockingPendingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/UnboundedBlockingPendingQueue.java
@@ -23,10 +23,9 @@ import org.apache.iotdb.pipe.api.event.Event;
import java.util.concurrent.LinkedBlockingQueue;
-public class ListenableUnboundedBlockingPendingQueue<E extends Event>
- extends ListenableBlockingPendingQueue<E> {
+public class UnboundedBlockingPendingQueue<E extends Event> extends BlockingPendingQueue<E> {
- public ListenableUnboundedBlockingPendingQueue() {
+ public UnboundedBlockingPendingQueue() {
super(new LinkedBlockingQueue<>());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 60488c6d253..a73a9da8b9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
-import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeCollector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -51,7 +51,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
* processing, and it also can notify the PipeTaskProcessorStage to start processing data when the
* queue is not empty.
*/
- private ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue;
+ private UnboundedBlockingPendingQueue<Event> collectorPendingQueue;
private final PipeCollector pipeCollector;
@@ -77,7 +77,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
.getAttribute()
.put(PipeCollectorConstant.DATA_REGION_KEY, String.valueOf(dataRegionId.getId()));
- collectorPendingQueue = new ListenableUnboundedBlockingPendingQueue<>();
+ collectorPendingQueue = new UnboundedBlockingPendingQueue<>();
this.pipeCollector =
new IoTDBDataRegionCollector(pipeTaskMeta, creationTime, collectorPendingQueue);
} else {
@@ -130,7 +130,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
return pipeCollector::supply;
}
- public ListenableUnboundedBlockingPendingQueue<Event> getCollectorPendingQueue() {
+ public UnboundedBlockingPendingQueue<Event> getCollectorPendingQueue() {
return collectorPendingQueue;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 98e8ba7e965..5e43b1a2d18 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.task.stage;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.core.connector.manager.PipeConnectorSubtaskManager;
import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -61,7 +61,7 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
PipeConnectorSubtaskManager.instance().deregister(connectorSubtaskId);
}
- public ListenableBoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
+ public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
return PipeConnectorSubtaskManager.instance().getPipeConnectorPendingQueue(connectorSubtaskId);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 04eedc94d4a..c0bad32a3f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -25,9 +25,9 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
+import org.apache.iotdb.db.pipe.task.queue.BlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -47,8 +47,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
protected final PipeProcessor pipeProcessor;
protected final PipeProcessorSubtask pipeProcessorSubtask;
- protected final ListenableBlockingPendingQueue<Event> pipeCollectorInputPendingQueue;
- protected final ListenableBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue;
+ protected final BlockingPendingQueue<Event> pipeCollectorInputPendingQueue;
+ protected final BlockingPendingQueue<Event> pipeConnectorOutputPendingQueue;
/**
* @param pipeName pipe name
@@ -65,9 +65,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
TConsensusGroupId dataRegionId,
PipeTaskMeta taskMeta,
EventSupplier pipeCollectorInputEventSupplier,
- @Nullable ListenableBlockingPendingQueue<Event> pipeCollectorInputPendingQueue,
+ @Nullable BlockingPendingQueue<Event> pipeCollectorInputPendingQueue,
PipeParameters pipeProcessorParameters,
- ListenableBoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
+ BoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
this.pipeProcessorParameters = pipeProcessorParameters;
final String taskId = pipeName + "_" + dataRegionId;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 84a4817d733..12802d88e5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.task.subtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -39,7 +39,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeConnectorSubtask.class);
- private final ListenableBoundedBlockingPendingQueue<Event> inputPendingQueue;
+ private final BoundedBlockingPendingQueue<Event> inputPendingQueue;
private final PipeConnector outputPipeConnector;
private static final int HEARTBEAT_CHECK_INTERVAL = 1000;
@@ -49,7 +49,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
public PipeConnectorSubtask(
String taskID,
PipeTaskMeta taskMeta,
- ListenableBoundedBlockingPendingQueue<Event> inputPendingQueue,
+ BoundedBlockingPendingQueue<Event> inputPendingQueue,
PipeConnector outputPipeConnector) {
super(taskID, taskMeta);
this.inputPendingQueue = inputPendingQueue;
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 27d28114007..d071c1ae5ef 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
-import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.wal.utils.WALEntryHandler;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
@@ -102,17 +102,14 @@ public class PipeRealtimeCollectTest {
// set up realtime collector
try (PipeRealtimeDataRegionHybridCollector collector1 =
- new PipeRealtimeDataRegionHybridCollector(
- null, new ListenableUnboundedBlockingPendingQueue<>());
+ new PipeRealtimeDataRegionHybridCollector(null, new UnboundedBlockingPendingQueue<>());
PipeRealtimeDataRegionHybridCollector collector2 =
- new PipeRealtimeDataRegionHybridCollector(
- null, new ListenableUnboundedBlockingPendingQueue<>());
+ new PipeRealtimeDataRegionHybridCollector(null, new UnboundedBlockingPendingQueue<>());
PipeRealtimeDataRegionHybridCollector collector3 =
- new PipeRealtimeDataRegionHybridCollector(
- null, new ListenableUnboundedBlockingPendingQueue<>());
+ new PipeRealtimeDataRegionHybridCollector(null, new UnboundedBlockingPendingQueue<>());
PipeRealtimeDataRegionHybridCollector collector4 =
new PipeRealtimeDataRegionHybridCollector(
- null, new ListenableUnboundedBlockingPendingQueue<>())) {
+ null, new UnboundedBlockingPendingQueue<>())) {
collector1.customize(
new PipeParameters(
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
index 43d1f2eefb8..c01d6f51b39 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.pipe.execution.executor;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -40,7 +40,7 @@ public class PipeConnectorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
new PipeConnectorSubtask(
"PipeConnectorSubtaskExecutorTest",
mock(PipeTaskMeta.class),
- mock(ListenableBoundedBlockingPendingQueue.class),
+ mock(BoundedBlockingPendingQueue.class),
mock(PipeConnector.class)));
}
}