You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/10 04:46:10 UTC
[iotdb] branch master updated: [IOTDB-5848] Pipe: task construction and life cycle management (#9808)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 15a3adc2eb [IOTDB-5848] Pipe: task construction and life cycle management (#9808)
15a3adc2eb is described below
commit 15a3adc2eb2ea6c341a192b5cc75866d35aca75b
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed May 10 12:46:01 2023 +0800
[IOTDB-5848] Pipe: task construction and life cycle management (#9808)
* introduce reconnect strategy when connection broken.
* handle start / stop / create / drop pipe request by managing local tasks on data nodes.
* provide a mechanism to handle idle and backpressure of pipe tasks.
---
.../commons/pipe/task/meta/PipeStaticMeta.java | 4 +-
.../api/exception/PipeConnectionException.java | 18 +--
.../db/pipe/agent/plugin/PipePluginAgent.java | 25 +++-
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 139 +++++++++---------
.../db/pipe/config/PipeCollectorConstant.java | 3 +-
...torConstant.java => PipeConnectorConstant.java} | 7 +-
...torConstant.java => PipeProcessorConstant.java} | 7 +-
.../core/collector/IoTDBDataRegionCollector.java | 5 +-
.../PipeHistoricalDataRegionTsFileCollector.java | 4 +-
.../realtime/PipeRealtimeDataRegionCollector.java | 4 +-
.../PipeRealtimeDataRegionHybridCollector.java | 22 ++-
.../connector/PipeConnectorSubtaskLifeCycle.java | 31 +++-
.../connector/PipeConnectorSubtaskManager.java | 19 ++-
.../event/view/collector/PipeEventCollector.java | 59 +++++++-
...anager.java => PipeSubtaskExecutorManager.java} | 12 +-
.../execution/scheduler/PipeTaskScheduler.java | 18 +--
.../org/apache/iotdb/db/pipe/task/PipeBuilder.java | 66 +++++++++
.../org/apache/iotdb/db/pipe/task/PipeTask.java | 9 +-
.../apache/iotdb/db/pipe/task/PipeTaskBuilder.java | 60 +++++++-
.../apache/iotdb/db/pipe/task/PipeTaskManager.java | 96 +++++++++++++
.../EventSupplier.java} | 24 ++--
.../queue/ListenableBlockingPendingQueue.java} | 21 +--
.../db/pipe/task/queue/ListenablePendingQueue.java | 159 +++++++++++++++++++++
.../ListenableUnblockingPendingQueue.java} | 18 +--
.../PendingQueueEmptyToNotEmptyListener.java} | 9 +-
.../PendingQueueFullToNotFullListener.java} | 9 +-
.../PendingQueueNotEmptyToEmptyListener.java} | 9 +-
.../PendingQueueNotFullToFullListener.java} | 9 +-
.../db/pipe/task/stage/PipeTaskCollectorStage.java | 55 +++++--
.../db/pipe/task/stage/PipeTaskConnectorStage.java | 100 +++----------
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 105 +++++++++++---
.../iotdb/db/pipe/task/stage/PipeTaskStage.java | 95 ++++++++++--
.../db/pipe/task/subtask/PipeConnectorSubtask.java | 87 ++++++++---
.../db/pipe/task/subtask/PipeProcessorSubtask.java | 20 +--
.../iotdb/db/pipe/task/subtask/PipeSubtask.java | 6 +-
.../collector/CachedSchemaPatternMatcherTest.java | 6 +-
.../core/collector/PipeRealtimeCollectTest.java | 17 +--
.../executor/PipeConnectorSubtaskExecutorTest.java | 8 +-
.../executor/PipeProcessorSubtaskExecutorTest.java | 10 +-
39 files changed, 1009 insertions(+), 366 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index eab7408386..f487de445c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -28,6 +28,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
public class PipeStaticMeta {
@@ -159,7 +160,8 @@ public class PipeStaticMeta {
@Override
public int hashCode() {
- return pipeName.hashCode();
+ return Objects.hash(
+ pipeName, creationTime, collectorParameters, processorParameters, connectorParameters);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
similarity index 74%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
copy to pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
index 39ffaefeeb..14cacc73fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
@@ -17,21 +17,15 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.task.subtask;
+package org.apache.iotdb.pipe.api.exception;
-public class PipeAssignerSubtask extends PipeSubtask {
+public class PipeConnectionException extends PipeException {
- public PipeAssignerSubtask(String taskID) {
- super(taskID);
+ public PipeConnectionException(String message) {
+ super(message);
}
- @Override
- protected void executeForAWhile() {
- // do nothing
- }
-
- @Override
- public void close() {
- // TODO
+ public PipeConnectionException(String message, Throwable cause) {
+ super(message, cause);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index d0b42f6fa2..3474edddf3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -19,12 +19,15 @@
package org.apache.iotdb.db.pipe.agent.plugin;
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
-import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
import org.apache.iotdb.pipe.api.PipeCollector;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.PipePlugin;
@@ -191,16 +194,28 @@ public class PipePluginAgent {
}
public PipeCollector reflectCollector(PipeParameters collectorParameters) {
- return new IoTDBDataRegionCollector(); // TODO: reflect plugin, use PipeIoTDBCollector as
- // default collector
+ return (PipeCollector)
+ reflect(
+ collectorParameters.getStringOrDefault(
+ PipeCollectorConstant.COLLECTOR_KEY,
+ BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName()));
}
public PipeProcessor reflectProcessor(PipeParameters processorParameters) {
- throw new UnsupportedOperationException("Not supported yet.");
+ return (PipeProcessor)
+ reflect(
+ processorParameters.getStringOrDefault(
+ PipeProcessorConstant.PROCESSOR_KEY,
+ BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName()));
}
public PipeConnector reflectConnector(PipeParameters connectorParameters) {
- throw new UnsupportedOperationException("Not supported yet.");
+ if (!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) {
+ throw new PipeManagementException(
+ "Failed to reflect PipeConnector instance because 'connector' is not specified in the parameters.");
+ }
+ return (PipeConnector)
+ reflect(connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY));
}
private PipePlugin reflect(String pluginName) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index c0df1e1761..13cb1f5ab5 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -23,11 +23,15 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.task.PipeBuilder;
+import org.apache.iotdb.db.pipe.task.PipeTask;
+import org.apache.iotdb.db.pipe.task.PipeTaskManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+
/**
* State transition diagram of a pipe task:
*
@@ -46,14 +50,16 @@ public class PipeTaskAgent {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskAgent.class);
private final PipeMetaKeeper pipeMetaKeeper;
+ private final PipeTaskManager pipeTaskManager;
public PipeTaskAgent() {
pipeMetaKeeper = new PipeMetaKeeper();
+ pipeTaskManager = new PipeTaskManager();
}
////////////////////////// Pipe Task Management //////////////////////////
- public void createPipe(PipeMeta pipeMeta) {
+ public synchronized void createPipe(PipeMeta pipeMeta) {
final String pipeName = pipeMeta.getStaticMeta().getPipeName();
final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
@@ -91,28 +97,20 @@ public class PipeTaskAgent {
dropPipe(pipeName, existedPipeMeta.getStaticMeta().getCreationTime());
}
- // build pipe task by consensus group
- pipeMeta
- .getRuntimeMeta()
- .getConsensusGroupIdToTaskMetaMap()
- .forEach(
- ((consensusGroupId, pipeTaskMeta) -> {
- createPipeTaskByConsensusGroup(
- pipeName, creationTime, consensusGroupId, pipeTaskMeta);
- }));
+ // create pipe tasks and trigger create() method for each pipe task
+ final Map<TConsensusGroupId, PipeTask> pipeTasks = new PipeBuilder(pipeMeta).build();
+ for (PipeTask pipeTask : pipeTasks.values()) {
+ pipeTask.create();
+ }
+ pipeTaskManager.addPipeTasks(pipeMeta.getStaticMeta(), pipeTasks);
+
// add pipe meta to pipe meta keeper
// note that we do not need to set the status of pipe meta here, because the status of pipe meta
// is already set to STOPPED when it is created
pipeMetaKeeper.addPipeMeta(pipeName, pipeMeta);
}
- public void createPipeTaskByConsensusGroup(
- String pipeName,
- long creationTime,
- TConsensusGroupId consensusGroupId,
- PipeTaskMeta pipeTaskMeta) {}
-
- public void dropPipe(String pipeName, long creationTime) {
+ public synchronized void dropPipe(String pipeName, long creationTime) {
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (existedPipeMeta == null) {
@@ -135,22 +133,26 @@ public class PipeTaskAgent {
// but the pipe task meta has not been cleaned up (in case of failure when executing
// dropPipeTaskByConsensusGroup).
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
- // drop pipe task by consensus group
- existedPipeMeta
- .getRuntimeMeta()
- .getConsensusGroupIdToTaskMetaMap()
- .forEach(
- ((consensusGroupId, pipeTaskMeta) -> {
- dropPipeTaskByConsensusGroup(pipeName, creationTime, consensusGroupId);
- }));
+
+ // drop pipe tasks and trigger drop() method for each pipe task
+ final Map<TConsensusGroupId, PipeTask> pipeTasks =
+ pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
+ if (pipeTasks == null) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped or has not been created. Skip dropping.",
+ pipeName,
+ creationTime);
+ return;
+ }
+ for (PipeTask pipeTask : pipeTasks.values()) {
+ pipeTask.drop();
+ }
+
// remove pipe meta from pipe meta keeper
pipeMetaKeeper.removePipeMeta(pipeName);
}
- public void dropPipeTaskByConsensusGroup(
- String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}
-
- public void dropPipe(String pipeName) {
+ public synchronized void dropPipe(String pipeName) {
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (existedPipeMeta == null) {
@@ -163,21 +165,24 @@ public class PipeTaskAgent {
// but the pipe task meta has not been cleaned up (in case of failure when executing
// dropPipeTaskByConsensusGroup).
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
- // drop pipe task by consensus group
- existedPipeMeta
- .getRuntimeMeta()
- .getConsensusGroupIdToTaskMetaMap()
- .forEach(
- ((consensusGroupId, pipeTaskMeta) -> {
- dropPipeTaskByConsensusGroup(pipeName, consensusGroupId);
- }));
+
+ // drop pipe tasks and trigger drop() method for each pipe task
+ final Map<TConsensusGroupId, PipeTask> pipeTasks =
+ pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
+ if (pipeTasks == null) {
+ LOGGER.info(
+ "Pipe {} has already been dropped or has not been created. Skip dropping.", pipeName);
+ return;
+ }
+ for (PipeTask pipeTask : pipeTasks.values()) {
+ pipeTask.drop();
+ }
+
// remove pipe meta from pipe meta keeper
pipeMetaKeeper.removePipeMeta(pipeName);
}
- public void dropPipeTaskByConsensusGroup(String pipeName, TConsensusGroupId consensusGroupId) {}
-
- public void startPipe(String pipeName, long creationTime) {
+ public synchronized void startPipe(String pipeName, long creationTime) {
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (existedPipeMeta == null) {
@@ -223,22 +228,25 @@ public class PipeTaskAgent {
"Unexpected status: " + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
}
- // start pipe task by consensus group
- existedPipeMeta
- .getRuntimeMeta()
- .getConsensusGroupIdToTaskMetaMap()
- .forEach(
- ((consensusGroupId, pipeTaskMeta) -> {
- startPipeTaskByConsensusGroup(pipeName, creationTime, consensusGroupId);
- }));
+ // trigger start() method for each pipe task
+ final Map<TConsensusGroupId, PipeTask> pipeTasks =
+ pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
+ if (pipeTasks == null) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped or has not been created. Skip starting.",
+ pipeName,
+ creationTime);
+ return;
+ }
+ for (PipeTask pipeTask : pipeTasks.values()) {
+ pipeTask.start();
+ }
+
// set pipe meta status to RUNNING
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
}
- public void startPipeTaskByConsensusGroup(
- String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}
-
- public void stopPipe(String pipeName, long creationTime) {
+ public synchronized void stopPipe(String pipeName, long creationTime) {
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (existedPipeMeta == null) {
@@ -284,18 +292,21 @@ public class PipeTaskAgent {
"Unexpected status: " + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
}
- // stop pipe task by consensus group
- existedPipeMeta
- .getRuntimeMeta()
- .getConsensusGroupIdToTaskMetaMap()
- .forEach(
- ((consensusGroupId, pipeTaskMeta) -> {
- stopPipeTaskByConsensusGroup(pipeName, creationTime, consensusGroupId);
- }));
+ // trigger stop() method for each pipe task
+ final Map<TConsensusGroupId, PipeTask> pipeTasks =
+ pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
+ if (pipeTasks == null) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped or has not been created. Skip stopping.",
+ pipeName,
+ creationTime);
+ return;
+ }
+ for (PipeTask pipeTask : pipeTasks.values()) {
+ pipeTask.stop();
+ }
+
// set pipe meta status to STOPPED
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
}
-
- public void stopPipeTaskByConsensusGroup(
- String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
index b75d2168da..5906de3a49 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
@@ -21,8 +21,9 @@ package org.apache.iotdb.db.pipe.config;
public class PipeCollectorConstant {
+ public static final String COLLECTOR_KEY = "collector";
public static final String PATTERN_PATTERN_KEY = "collector.pattern";
- public static final String PATTERN_DATA_REGION_KEY = "collector.data-region";
+ public static final String DATA_REGION_KEY = "collector.data-region";
private PipeCollectorConstant() {
throw new IllegalStateException("Utility class");
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
similarity index 80%
copy from server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
index b75d2168da..67a637503b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
@@ -19,12 +19,11 @@
package org.apache.iotdb.db.pipe.config;
-public class PipeCollectorConstant {
+public class PipeConnectorConstant {
- public static final String PATTERN_PATTERN_KEY = "collector.pattern";
- public static final String PATTERN_DATA_REGION_KEY = "collector.data-region";
+ public static final String CONNECTOR_KEY = "connector";
- private PipeCollectorConstant() {
+ private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java
similarity index 80%
copy from server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java
index b75d2168da..1af34f3ef3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java
@@ -19,12 +19,11 @@
package org.apache.iotdb.db.pipe.config;
-public class PipeCollectorConstant {
+public class PipeProcessorConstant {
- public static final String PATTERN_PATTERN_KEY = "collector.pattern";
- public static final String PATTERN_DATA_REGION_KEY = "collector.data-region";
+ public static final String PROCESSOR_KEY = "processor";
- private PipeCollectorConstant() {
+ private PipeProcessorConstant() {
throw new IllegalStateException("Utility class");
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index 0fc9fdf69e..d7526fa187 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.core.collector;
import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeCollector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -38,9 +39,9 @@ public class IoTDBDataRegionCollector implements PipeCollector {
// TODO: support pattern in historical collector
private final PipeHistoricalDataRegionTsFileCollector historicalCollector;
- public IoTDBDataRegionCollector() {
+ public IoTDBDataRegionCollector(ListenableUnblockingPendingQueue<Event> collectorPendingQueue) {
hasBeenStarted = new AtomicBoolean(false);
- realtimeCollector = new PipeRealtimeDataRegionHybridCollector();
+ realtimeCollector = new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
historicalCollector = new PipeHistoricalDataRegionTsFileCollector();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 9ba2f3f9b7..4559ca8a53 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -43,13 +43,13 @@ public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
@Override
public void validate(PipeParameterValidator validator) throws Exception {
- validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
+ validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
}
@Override
public void customize(
PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
- dataRegionId = parameters.getInt(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
+ dataRegionId = parameters.getInt(PipeCollectorConstant.DATA_REGION_KEY);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
index 0825b14023..08e731df13 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -35,14 +35,14 @@ public abstract class PipeRealtimeDataRegionCollector implements PipeCollector {
@Override
public void validate(PipeParameterValidator validator) throws Exception {
validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_PATTERN_KEY);
- validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
+ validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
}
@Override
public void customize(
PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
pattern = parameters.getString(PipeCollectorConstant.PATTERN_PATTERN_KEY);
- dataRegionId = parameters.getString(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
+ dataRegionId = parameters.getString(PipeCollectorConstant.DATA_REGION_KEY);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index a48654d292..d8b79fbfcf 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -22,13 +22,12 @@ package org.apache.iotdb.db.pipe.core.collector.realtime;
import org.apache.iotdb.db.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
import org.apache.iotdb.pipe.api.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ArrayBlockingQueue;
-
// TODO: make this collector as a builtin pipe plugin. register it in BuiltinPipePlugin.
public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegionCollector {
@@ -38,12 +37,11 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
// TODO: memory control
// This queue is used to store pending events collected by the method collect(). The method
// supply() will poll events from this queue and send them to the next pipe plugin.
- private final ArrayBlockingQueue<PipeRealtimeCollectEvent> pendingQueue;
+ private final ListenableUnblockingPendingQueue<Event> pendingQueue;
- public PipeRealtimeDataRegionHybridCollector() {
- this.pendingQueue =
- new ArrayBlockingQueue<>(
- PipeConfig.getInstance().getRealtimeCollectorPendingQueueCapacity());
+ public PipeRealtimeDataRegionHybridCollector(
+ ListenableUnblockingPendingQueue<Event> pendingQueue) {
+ this.pendingQueue = pendingQueue;
}
@Override
@@ -90,9 +88,9 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
String.format(
"Pending Queue of Hybrid Realtime Collector %s has reached capacity, discard TsFile Event %s, current state %s",
this, event, event.getTsFileEpoch().getState(this)));
- // TODO: more degradation strategies
- // TODO: dynamic control of the pending queue capacity
- // TODO: should be handled by the PipeRuntimeAgent
+ // this would not happen, but just in case.
+ // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
+ // TODO: memory control when elements in queue are too many.
}
}
@@ -103,7 +101,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
@Override
public Event supply() {
- PipeRealtimeCollectEvent collectEvent = pendingQueue.poll();
+ PipeRealtimeCollectEvent collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
while (collectEvent != null) {
Event suppliedEvent;
@@ -124,7 +122,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
return suppliedEvent;
}
- collectEvent = pendingQueue.poll();
+ collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
}
// means the pending queue is empty.
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
index 7e71ae2f7a..9ed53ae310 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
@@ -20,20 +20,36 @@
package org.apache.iotdb.db.pipe.core.connector;
import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
+import org.apache.iotdb.pipe.api.event.Event;
public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
private final PipeConnectorSubtaskExecutor executor;
private final PipeConnectorSubtask subtask;
+ private final ListenableBlockingPendingQueue<Event> pendingQueue;
private int runningTaskCount;
private int aliveTaskCount;
public PipeConnectorSubtaskLifeCycle(
- PipeConnectorSubtaskExecutor executor, PipeConnectorSubtask subtask) {
+ PipeConnectorSubtaskExecutor executor,
+ PipeConnectorSubtask subtask,
+ ListenableBlockingPendingQueue<Event> pendingQueue) {
this.executor = executor;
this.subtask = subtask;
+ this.pendingQueue = pendingQueue;
+
+ pendingQueue.registerEmptyToNotEmptyListener(
+ subtask.getTaskID(),
+ () -> {
+ if (hasRunningTasks()) {
+ executor.start(subtask.getTaskID());
+ }
+ });
+ this.pendingQueue.registerNotEmptyToEmptyListener(
+ subtask.getTaskID(), () -> executor.stop(subtask.getTaskID()));
runningTaskCount = 0;
aliveTaskCount = 0;
@@ -43,6 +59,10 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
return subtask;
}
+ public ListenableBlockingPendingQueue<Event> getPendingQueue() {
+ return pendingQueue;
+ }
+
public synchronized void register() {
if (aliveTaskCount < 0) {
throw new IllegalStateException("aliveTaskCount < 0");
@@ -63,7 +83,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
throw new IllegalStateException("aliveTaskCount <= 0");
}
if (aliveTaskCount == 1) {
- executor.deregister(subtask.getTaskID());
+ close();
// this subtask is out of life cycle, should never be used again
return true;
}
@@ -93,6 +113,13 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
@Override
public synchronized void close() {
+ pendingQueue.removeEmptyToNotEmptyListener(subtask.getTaskID());
+ pendingQueue.removeNotEmptyToEmptyListener(subtask.getTaskID());
+
executor.deregister(subtask.getTaskID());
}
+
+ private synchronized boolean hasRunningTasks() {
+ return runningTaskCount > 0;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
index a00cedf7b5..070e4e05b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.db.pipe.core.connector;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import java.util.HashMap;
@@ -42,10 +44,13 @@ public class PipeConnectorSubtaskManager {
if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
final PipeConnector pipeConnector = PipeAgent.plugin().reflectConnector(connectorAttributes);
+ // TODO: make pendingQueue size configurable
+ final ListenableBlockingPendingQueue<Event> pendingQueue =
+ new ListenableBlockingPendingQueue<>(65535);
final PipeConnectorSubtask pipeConnectorSubtask =
- new PipeConnectorSubtask(attributeSortedString, pipeConnector);
+ new PipeConnectorSubtask(attributeSortedString, pendingQueue, pipeConnector);
final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
- new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask);
+ new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask, pendingQueue);
attributeSortedString2SubtaskLifeCycleMap.put(
attributeSortedString, pipeConnectorSubtaskLifeCycle);
}
@@ -93,6 +98,16 @@ public class PipeConnectorSubtaskManager {
return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getSubtask();
}
+ public ListenableBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
+ String attributeSortedString) {
+ if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
+ throw new PipeException(
+ "Failed to get PendingQueue. No such subtask: " + attributeSortedString);
+ }
+
+ return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getPendingQueue();
+ }
+
///////////////////////// Singleton Instance Holder /////////////////////////
private PipeConnectorSubtaskManager() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index 7cc0778193..0d1d60fdde 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -19,21 +19,72 @@
package org.apache.iotdb.db.pipe.core.event.view.collector;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
public class PipeEventCollector implements EventCollector {
+ private final ListenableBlockingPendingQueue<Event> pendingQueue;
+
+ // buffer queue is used to store events that are not offered to pending queue
+ // because the pending queue is full. when pending queue is full, pending queue
+ // will notify tasks to stop collecting events, and buffer queue will be used to store
+ // events before tasks are stopped. when pending queue is not full and tasks are
+ // notified by the pending queue to start collecting events, buffer queue will be used to store
+ // events before events in buffer queue are offered to pending queue.
+ private final Queue<Event> bufferQueue;
+
+ public PipeEventCollector(ListenableBlockingPendingQueue<Event> pendingQueue) {
+ this.pendingQueue = pendingQueue;
+ bufferQueue = new LinkedList<>();
+ }
+
@Override
- public void collectTabletInsertionEvent(TabletInsertionEvent event) throws IOException {}
+ public void collectTabletInsertionEvent(TabletInsertionEvent event) {
+ collect(event);
+ }
@Override
- public void collectTsFileInsertionEvent(TsFileInsertionEvent event) throws IOException {}
+ public void collectTsFileInsertionEvent(TsFileInsertionEvent event) {
+ collect(event);
+ }
@Override
- public void collectDeletionEvent(DeletionEvent event) throws IOException {}
+ public void collectDeletionEvent(DeletionEvent event) {
+ collect(event);
+ }
+
+ private synchronized void collect(Event event) {
+ while (!bufferQueue.isEmpty()) {
+ final Event bufferedEvent = bufferQueue.peek();
+ if (pendingQueue.offer(bufferedEvent)) {
+ bufferQueue.poll();
+ } else {
+ bufferQueue.offer(event);
+ return;
+ }
+ }
+
+ if (!pendingQueue.offer(event)) {
+ bufferQueue.offer(event);
+ }
+ }
+
+ public synchronized void tryCollectBufferedEvents() {
+ while (!bufferQueue.isEmpty()) {
+ final Event bufferedEvent = bufferQueue.peek();
+ if (pendingQueue.offer(bufferedEvent)) {
+ bufferQueue.poll();
+ } else {
+ return;
+ }
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutorManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutorManager.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
index 8698a23d86..fc6a335ad0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutorManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
@@ -26,9 +26,9 @@ import org.slf4j.LoggerFactory;
* PipeTaskExecutor is responsible for executing the pipe tasks, and it is scheduled by the
* PipeTaskScheduler. It is a singleton class.
*/
-public class PipeTaskExecutorManager {
+public class PipeSubtaskExecutorManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskExecutorManager.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeSubtaskExecutorManager.class);
private final PipeAssignerSubtaskExecutor assignerSubtaskExecutor;
private final PipeProcessorSubtaskExecutor processorSubtaskExecutor;
@@ -48,19 +48,19 @@ public class PipeTaskExecutorManager {
///////////////////////// Singleton Instance Holder /////////////////////////
- private PipeTaskExecutorManager() {
+ private PipeSubtaskExecutorManager() {
assignerSubtaskExecutor = new PipeAssignerSubtaskExecutor();
processorSubtaskExecutor = new PipeProcessorSubtaskExecutor();
connectorSubtaskExecutor = new PipeConnectorSubtaskExecutor();
}
private static class PipeTaskExecutorHolder {
- private static PipeTaskExecutorManager instance = null;
+ private static PipeSubtaskExecutorManager instance = null;
}
- public static PipeTaskExecutorManager setupAndGetInstance() {
+ public static synchronized PipeSubtaskExecutorManager getInstance() {
if (PipeTaskExecutorHolder.instance == null) {
- PipeTaskExecutorHolder.instance = new PipeTaskExecutorManager();
+ PipeTaskExecutorHolder.instance = new PipeSubtaskExecutorManager();
}
return PipeTaskExecutorHolder.instance;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
index 188bbac0e4..4f035ca671 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.execution.scheduler;
-import org.apache.iotdb.db.pipe.execution.executor.PipeTaskExecutorManager;
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
/**
* PipeTaskScheduler is a singleton class that manages the numbers of threads used by
@@ -27,34 +27,34 @@ import org.apache.iotdb.db.pipe.execution.executor.PipeTaskExecutorManager;
*/
public class PipeTaskScheduler {
- private final PipeTaskExecutorManager pipeTaskExecutorManager =
- PipeTaskExecutorManager.setupAndGetInstance();
+ private final PipeSubtaskExecutorManager pipeSubtaskExecutorManager =
+ PipeSubtaskExecutorManager.getInstance();
public void adjustAssignerSubtaskExecutorThreadNum(int threadNum) {
// TODO: make it configurable by setting different parameters
- pipeTaskExecutorManager.getAssignerSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
+ pipeSubtaskExecutorManager.getAssignerSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
}
public int getAssignerSubtaskExecutorThreadNum() {
- return pipeTaskExecutorManager.getAssignerSubtaskExecutor().getExecutorThreadNumber();
+ return pipeSubtaskExecutorManager.getAssignerSubtaskExecutor().getExecutorThreadNumber();
}
public void adjustConnectorSubtaskExecutorThreadNum(int threadNum) {
// TODO: make it configurable by setting different parameters
- pipeTaskExecutorManager.getConnectorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
+ pipeSubtaskExecutorManager.getConnectorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
}
public int getConnectorSubtaskExecutorThreadNum() {
- return pipeTaskExecutorManager.getConnectorSubtaskExecutor().getExecutorThreadNumber();
+ return pipeSubtaskExecutorManager.getConnectorSubtaskExecutor().getExecutorThreadNumber();
}
public void adjustProcessorSubtaskExecutorThreadNum(int threadNum) {
// TODO: make it configurable by setting different parameters
- pipeTaskExecutorManager.getProcessorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
+ pipeSubtaskExecutorManager.getProcessorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
}
public int getProcessorSubtaskExecutorThreadNum() {
- return pipeTaskExecutorManager.getProcessorSubtaskExecutor().getExecutorThreadNumber();
+ return pipeSubtaskExecutorManager.getProcessorSubtaskExecutor().getExecutorThreadNumber();
}
///////////////////////// Singleton Instance Holder /////////////////////////
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
new file mode 100644
index 0000000000..b825784526
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PipeBuilder {
+
+ private final PipeMeta pipeMeta;
+
+ public PipeBuilder(PipeMeta pipeMeta) {
+ this.pipeMeta = pipeMeta;
+ }
+
+ public Map<TConsensusGroupId, PipeTask> build() {
+ final PipeStaticMeta pipeStaticMeta = pipeMeta.getStaticMeta();
+ final String pipeName = pipeStaticMeta.getPipeName();
+ final PipeParameters collectorParameters = pipeStaticMeta.getCollectorParameters();
+ final PipeParameters processorParameters = pipeStaticMeta.getProcessorParameters();
+ final PipeParameters connectorParameters = pipeStaticMeta.getConnectorParameters();
+
+ final Map<TConsensusGroupId, PipeTask> consensusGroupIdToPipeTaskMap = new HashMap<>();
+
+ final PipeRuntimeMeta pipeRuntimeMeta = pipeMeta.getRuntimeMeta();
+ for (Map.Entry<TConsensusGroupId, PipeTaskMeta> consensusGroupIdPipeTaskMeta :
+ pipeRuntimeMeta.getConsensusGroupIdToTaskMetaMap().entrySet()) {
+ consensusGroupIdToPipeTaskMap.put(
+ consensusGroupIdPipeTaskMeta.getKey(),
+ new PipeTaskBuilder(
+ pipeName,
+ Integer.toString(consensusGroupIdPipeTaskMeta.getValue().getRegionLeader()),
+ // TODO: consensusGroupIdPipeTaskMeta.getValue().getProgressIndex() is not used
+ collectorParameters,
+ processorParameters,
+ connectorParameters)
+ .build());
+ }
+
+ return consensusGroupIdToPipeTaskMap;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
index 416c2c17fa..26475c5c4b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
@@ -24,17 +24,20 @@ import org.apache.iotdb.db.pipe.task.stage.PipeTaskStage;
public class PipeTask {
private final String pipeName;
+ private final String dataRegionId;
private final PipeTaskStage collectorStage;
private final PipeTaskStage processorStage;
private final PipeTaskStage connectorStage;
- public PipeTask(
+ PipeTask(
String pipeName,
+ String dataRegionId,
PipeTaskStage collectorStage,
PipeTaskStage processorStage,
PipeTaskStage connectorStage) {
this.pipeName = pipeName;
+ this.dataRegionId = dataRegionId;
this.collectorStage = collectorStage;
this.processorStage = processorStage;
@@ -65,6 +68,10 @@ public class PipeTask {
connectorStage.stop();
}
+ public String getDataRegionId() {
+ return dataRegionId;
+ }
+
public String getPipeName() {
return pipeName;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 19e5f460ea..23fb60c9a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -19,5 +19,61 @@
package org.apache.iotdb.db.pipe.task;
-/** PipeTaskBuilder is used to build a PipeTask. */
-public class PipeTaskBuilder {}
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.db.pipe.task.stage.PipeTaskCollectorStage;
+import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
+import org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+
+public class PipeTaskBuilder {
+
+ private final String pipeName;
+ private final String dataRegionId;
+ private final PipeParameters pipeCollectorParameters;
+ private final PipeParameters pipeProcessorParameters;
+ private final PipeParameters pipeConnectorParameters;
+
+ PipeTaskBuilder(
+ String pipeName,
+ String dataRegionId,
+ PipeParameters pipeCollectorParameters,
+ PipeParameters pipeProcessorParameters,
+ PipeParameters pipeConnectorParameters) {
+ this.pipeName = pipeName;
+ this.dataRegionId = dataRegionId;
+ this.pipeCollectorParameters = pipeCollectorParameters;
+ this.pipeProcessorParameters = pipeProcessorParameters;
+ this.pipeConnectorParameters = pipeConnectorParameters;
+ }
+
+ PipeTaskBuilder(String dataRegionId, PipeStaticMeta pipeStaticMeta) {
+ this(
+ pipeStaticMeta.getPipeName(),
+ dataRegionId,
+ pipeStaticMeta.getCollectorParameters(),
+ pipeStaticMeta.getProcessorParameters(),
+ pipeStaticMeta.getConnectorParameters());
+ }
+
+ public PipeTask build() {
+ // event flow: collector -> processor -> connector
+
+ // we first build the collector and connector, then build the processor.
+ final PipeTaskCollectorStage collectorStage =
+ new PipeTaskCollectorStage(dataRegionId, pipeCollectorParameters);
+ final PipeTaskConnectorStage connectorStage =
+ new PipeTaskConnectorStage(pipeConnectorParameters);
+
+ // the processor connects the collector and connector.
+ final PipeTaskProcessorStage processorStage =
+ new PipeTaskProcessorStage(
+ pipeName,
+ dataRegionId,
+ collectorStage.getEventSupplier(),
+ collectorStage.getCollectorPendingQueue(),
+ pipeProcessorParameters,
+ connectorStage.getPipeConnectorPendingQueue());
+
+ return new PipeTask(pipeName, dataRegionId, collectorStage, processorStage, connectorStage);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskManager.java
new file mode 100644
index 0000000000..016b2537f9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskManager.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PipeTaskManager {
+
+ private final Map<PipeStaticMeta, Map<TConsensusGroupId, PipeTask>> pipeMap = new HashMap<>();
+
+ /** Add pipe task by pipe static meta and consensus group id. */
+ public synchronized void addPipeTask(
+ PipeStaticMeta pipeStaticMeta, TConsensusGroupId consensusGroupId, PipeTask pipeTask) {
+ pipeMap.computeIfAbsent(pipeStaticMeta, k -> new HashMap<>()).put(consensusGroupId, pipeTask);
+ }
+
+ /** Add pipe tasks by pipe static meta. */
+ public synchronized void addPipeTasks(
+ PipeStaticMeta pipeStaticMeta, Map<TConsensusGroupId, PipeTask> pipeTasks) {
+ pipeMap.computeIfAbsent(pipeStaticMeta, k -> new HashMap<>()).putAll(pipeTasks);
+ }
+
+ /**
+ * Remove pipe task by pipe static meta and consensus group id.
+ *
+ * @param pipeStaticMeta pipe static meta
+ * @param consensusGroupId consensus group id
+ * @return pipe task if exists, null otherwise
+ */
+ public synchronized PipeTask removePipeTask(
+ PipeStaticMeta pipeStaticMeta, TConsensusGroupId consensusGroupId) {
+ Map<TConsensusGroupId, PipeTask> consensusGroupIdPipeTaskMap = pipeMap.get(pipeStaticMeta);
+ if (consensusGroupIdPipeTaskMap != null) {
+ return consensusGroupIdPipeTaskMap.remove(consensusGroupId);
+ }
+ return null;
+ }
+
+ /**
+ * Remove pipe tasks by pipe static meta.
+ *
+ * @param pipeStaticMeta pipe static meta
+ * @return pipe tasks if exists, null otherwise
+ */
+ public synchronized Map<TConsensusGroupId, PipeTask> removePipeTasks(
+ PipeStaticMeta pipeStaticMeta) {
+ return pipeMap.remove(pipeStaticMeta);
+ }
+
+ /**
+ * Get pipe task by pipe static meta and consensus group id.
+ *
+ * @param pipeStaticMeta pipe static meta
+ * @param consensusGroupId consensus group id
+ * @return pipe task if exists, null otherwise
+ */
+ public synchronized PipeTask getPipeTask(
+ PipeStaticMeta pipeStaticMeta, TConsensusGroupId consensusGroupId) {
+ Map<TConsensusGroupId, PipeTask> consensusGroupIdPipeTaskMap = pipeMap.get(pipeStaticMeta);
+ if (consensusGroupIdPipeTaskMap != null) {
+ return consensusGroupIdPipeTaskMap.get(consensusGroupId);
+ }
+ return null;
+ }
+
+ /**
+ * Get pipe tasks by pipe static meta.
+ *
+ * @param pipeStaticMeta pipe static meta
+ * @return pipe tasks if exists, null otherwise
+ */
+ public synchronized Map<TConsensusGroupId, PipeTask> getPipeTasks(PipeStaticMeta pipeStaticMeta) {
+ return pipeMap.get(pipeStaticMeta);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/EventSupplier.java
similarity index 65%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/EventSupplier.java
index 39ffaefeeb..ea056dc22a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/EventSupplier.java
@@ -17,21 +17,17 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.task.subtask;
+package org.apache.iotdb.db.pipe.task.queue;
-public class PipeAssignerSubtask extends PipeSubtask {
+import org.apache.iotdb.pipe.api.event.Event;
- public PipeAssignerSubtask(String taskID) {
- super(taskID);
- }
+@FunctionalInterface
+public interface EventSupplier {
- @Override
- protected void executeForAWhile() {
- // do nothing
- }
-
- @Override
- public void close() {
- // TODO
- }
+ /**
+ * @return the event to be supplied. the event may be null if the collector has no more events at
+ * the moment, but the collector is still running for more events.
+ * @throws Exception if the supplier fails to supply the event.
+ */
+ Event supply() throws Exception;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
similarity index 59%
rename from server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
index c3c5aedb86..ab3090a0ad 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
@@ -17,24 +17,15 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.execution.executor;
+package org.apache.iotdb.db.pipe.task.queue;
-import org.apache.iotdb.db.pipe.task.subtask.PipeAssignerSubtask;
+import org.apache.iotdb.pipe.api.event.Event;
-import org.junit.Before;
-import org.mockito.Mockito;
+import org.eclipse.jetty.util.BlockingArrayQueue;
-public class PipeAssignerSubtaskExecutorTest extends PipeSubtaskExecutorTest {
+public class ListenableBlockingPendingQueue<E extends Event> extends ListenablePendingQueue<E> {
- @Before
- public void setUp() throws Exception {
- executor = new PipeAssignerSubtaskExecutor();
-
- subtask =
- Mockito.spy(
- new PipeAssignerSubtask("PipeAssignerSubtaskExecutorTest") {
- @Override
- public void executeForAWhile() {}
- });
+ public ListenableBlockingPendingQueue(int pendingQueueSize) {
+ super(new BlockingArrayQueue<>(pendingQueueSize));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenablePendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenablePendingQueue.java
new file mode 100644
index 0000000000..f476d88053
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenablePendingQueue.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.queue;
+
+import org.apache.iotdb.pipe.api.event.Event;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class ListenablePendingQueue<E extends Event> {
+
+ private final Queue<E> pendingQueue;
+
+ private final Map<String, PendingQueueEmptyToNotEmptyListener> emptyToNotEmptyListeners =
+ new ConcurrentHashMap<>();
+ private final Map<String, PendingQueueNotEmptyToEmptyListener> notEmptyToEmptyListeners =
+ new ConcurrentHashMap<>();
+ private final Map<String, PendingQueueFullToNotFullListener> fullToNotFullListeners =
+ new ConcurrentHashMap<>();
+ private final Map<String, PendingQueueNotFullToFullListener> notFullToFullListeners =
+ new ConcurrentHashMap<>();
+
+ private final AtomicBoolean isFull = new AtomicBoolean(false);
+
+ protected ListenablePendingQueue(Queue<E> pendingQueue) {
+ this.pendingQueue = pendingQueue;
+ }
+
+ public ListenablePendingQueue<E> registerEmptyToNotEmptyListener(
+ String id, PendingQueueEmptyToNotEmptyListener listener) {
+ emptyToNotEmptyListeners.put(id, listener);
+ return this;
+ }
+
+ public void removeEmptyToNotEmptyListener(String id) {
+ emptyToNotEmptyListeners.remove(id);
+ }
+
+ public void notifyEmptyToNotEmptyListeners() {
+ emptyToNotEmptyListeners
+ .values()
+ .forEach(PendingQueueEmptyToNotEmptyListener::onPendingQueueEmptyToNotEmpty);
+ }
+
+ public ListenablePendingQueue<E> registerNotEmptyToEmptyListener(
+ String id, PendingQueueNotEmptyToEmptyListener listener) {
+ notEmptyToEmptyListeners.put(id, listener);
+ return this;
+ }
+
+ public void removeNotEmptyToEmptyListener(String id) {
+ notEmptyToEmptyListeners.remove(id);
+ }
+
+ public void notifyNotEmptyToEmptyListeners() {
+ notEmptyToEmptyListeners
+ .values()
+ .forEach(PendingQueueNotEmptyToEmptyListener::onPendingQueueNotEmptyToEmpty);
+ }
+
+ public ListenablePendingQueue<E> registerFullToNotFullListener(
+ String id, PendingQueueFullToNotFullListener listener) {
+ fullToNotFullListeners.put(id, listener);
+ return this;
+ }
+
+ public void removeFullToNotFullListener(String id) {
+ fullToNotFullListeners.remove(id);
+ }
+
+ public void notifyFullToNotFullListeners() {
+ fullToNotFullListeners
+ .values()
+ .forEach(PendingQueueFullToNotFullListener::onPendingQueueFullToNotFull);
+ }
+
+ public ListenablePendingQueue<E> registerNotFullToFullListener(
+ String id, PendingQueueNotFullToFullListener listener) {
+ notFullToFullListeners.put(id, listener);
+ return this;
+ }
+
+ public void removeNotFullToFullListener(String id) {
+ notFullToFullListeners.remove(id);
+ }
+
+ public void notifyNotFullToFullListeners() {
+ notFullToFullListeners
+ .values()
+ .forEach(PendingQueueNotFullToFullListener::onPendingQueueNotFullToFull);
+ }
+
+ public boolean offer(E event) {
+ final boolean isEmpty = pendingQueue.isEmpty();
+ final boolean isAdded = pendingQueue.offer(event);
+
+ if (isAdded) {
+ // we don't use size() == 1 to check whether the listener should be called,
+ // because offer() and size() are not atomic, and we don't want to use lock
+ // to make them atomic.
+ if (isEmpty) {
+ notifyEmptyToNotEmptyListeners();
+ }
+ } else {
+ if (isFull.compareAndSet(false, true)) {
+ notifyNotFullToFullListeners();
+ }
+ }
+
+ return isAdded;
+ }
+
+ public E poll() {
+ final boolean isEmpty = pendingQueue.isEmpty();
+ final E event = pendingQueue.poll();
+
+ if (event == null) {
+ // we don't use size() == 0 to check whether the listener should be called,
+ // because poll() and size() are not atomic, and we don't want to use lock
+ // to make them atomic.
+ if (!isEmpty) {
+ notifyNotEmptyToEmptyListeners();
+ }
+ } else {
+ if (isFull.compareAndSet(true, false)) {
+ notifyFullToNotFullListeners();
+ }
+ }
+
+ return event;
+ }
+
+ public void clear() {
+ pendingQueue.clear();
+ }
+
+ public int size() {
+ return pendingQueue.size();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnblockingPendingQueue.java
similarity index 71%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnblockingPendingQueue.java
index 39ffaefeeb..c2772b4eeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnblockingPendingQueue.java
@@ -17,21 +17,15 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.task.subtask;
+package org.apache.iotdb.db.pipe.task.queue;
-public class PipeAssignerSubtask extends PipeSubtask {
+import org.apache.iotdb.pipe.api.event.Event;
- public PipeAssignerSubtask(String taskID) {
- super(taskID);
- }
+import java.util.concurrent.ConcurrentLinkedQueue;
- @Override
- protected void executeForAWhile() {
- // do nothing
- }
+public class ListenableUnblockingPendingQueue<E extends Event> extends ListenablePendingQueue<E> {
- @Override
- public void close() {
- // TODO
+ public ListenableUnblockingPendingQueue() {
+ super(new ConcurrentLinkedQueue<>());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java
similarity index 83%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java
index 19e5f460ea..d56b6e789e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java
@@ -17,7 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.task;
+package org.apache.iotdb.db.pipe.task.queue;
-/** PipeTaskBuilder is used to build a PipeTask. */
-public class PipeTaskBuilder {}
+@FunctionalInterface
+public interface PendingQueueEmptyToNotEmptyListener {
+
+ void onPendingQueueEmptyToNotEmpty();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java
similarity index 83%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java
index 19e5f460ea..33225505f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java
@@ -17,7 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.task;
+package org.apache.iotdb.db.pipe.task.queue;
-/** PipeTaskBuilder is used to build a PipeTask. */
-public class PipeTaskBuilder {}
+@FunctionalInterface
+public interface PendingQueueFullToNotFullListener {
+
+ void onPendingQueueFullToNotFull();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java
similarity index 83%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java
index 19e5f460ea..4225783739 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java
@@ -17,7 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.task;
+package org.apache.iotdb.db.pipe.task.queue;
-/** PipeTaskBuilder is used to build a PipeTask. */
-public class PipeTaskBuilder {}
+@FunctionalInterface
+public interface PendingQueueNotEmptyToEmptyListener {
+
+ void onPendingQueueNotEmptyToEmpty();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java
similarity index 83%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java
index 19e5f460ea..2433cd4b8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java
@@ -17,7 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.task;
+package org.apache.iotdb.db.pipe.task.queue;
-/** PipeTaskBuilder is used to build a PipeTask. */
-public class PipeTaskBuilder {}
+@FunctionalInterface
+public interface PendingQueueNotFullToFullListener {
+
+ void onPendingQueueNotFullToFull();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 51a344d580..73c127f3da 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -19,29 +19,59 @@
package org.apache.iotdb.db.pipe.task.stage;
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
+import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeCollector;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
-public class PipeTaskCollectorStage implements PipeTaskStage {
+public class PipeTaskCollectorStage extends PipeTaskStage {
private final PipeParameters collectorParameters;
+ /**
+ * TODO: have a better way to control busy/idle status of PipeTaskCollectorStage.
+ *
+ * <p>Currently, this field is for IoTDBDataRegionCollector only. IoTDBDataRegionCollector uses
+ * collectorPendingQueue as an internal data structure to store realtime events.
+ *
+ * <p>PendingQueue can detect whether the queue is empty or not, and it can notify the
+ * PipeTaskProcessorStage to stop processing data when the queue is empty to avoid unnecessary
+ * processing, and it also can notify the PipeTaskProcessorStage to start processing data when the
+ * queue is not empty.
+ */
+ private ListenableUnblockingPendingQueue<Event> collectorPendingQueue;
+
private PipeCollector pipeCollector;
- PipeTaskCollectorStage(PipeParameters collectorParameters) {
+ public PipeTaskCollectorStage(String dataRegionId, PipeParameters collectorParameters) {
this.collectorParameters = collectorParameters;
+ // set data region id to collector parameters, so that collector can get data region id inside
+ // collector
+ collectorParameters.getAttribute().put(PipeCollectorConstant.DATA_REGION_KEY, dataRegionId);
}
@Override
- public void create() throws PipeException {
- this.pipeCollector = PipeAgent.plugin().reflectCollector(collectorParameters);
+ public void createSubtask() throws PipeException {
+ if (collectorParameters
+ .getStringOrDefault(
+ PipeCollectorConstant.COLLECTOR_KEY,
+ BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName())
+ .equals(BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName())) {
+ collectorPendingQueue = new ListenableUnblockingPendingQueue<>();
+ this.pipeCollector = new IoTDBDataRegionCollector(collectorPendingQueue);
+ } else {
+ this.pipeCollector = PipeAgent.plugin().reflectCollector(collectorParameters);
+ }
}
@Override
- public void start() throws PipeException {
+ public void startSubtask() throws PipeException {
try {
pipeCollector.start();
} catch (Exception e) {
@@ -50,12 +80,12 @@ public class PipeTaskCollectorStage implements PipeTaskStage {
}
@Override
- public void stop() throws PipeException {
+ public void stopSubtask() throws PipeException {
// collector continuously collects data, so do nothing in stop
}
@Override
- public void drop() throws PipeException {
+ public void dropSubtask() throws PipeException {
try {
pipeCollector.close();
} catch (Exception e) {
@@ -63,8 +93,11 @@ public class PipeTaskCollectorStage implements PipeTaskStage {
}
}
- @Override
- public PipeSubtask getSubtask() {
- throw new UnsupportedOperationException("Collector stage does not have subtask.");
+ public EventSupplier getEventSupplier() {
+ return () -> pipeCollector.supply();
+ }
+
+ public ListenableUnblockingPendingQueue<Event> getCollectorPendingQueue() {
+ return collectorPendingQueue;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 0194dc5c8a..62cadbde87 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -19,114 +19,48 @@
package org.apache.iotdb.db.pipe.task.stage;
-import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.db.pipe.core.connector.PipeConnectorSubtaskManager;
-import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
-public class PipeTaskConnectorStage implements PipeTaskStage {
+public class PipeTaskConnectorStage extends PipeTaskStage {
- protected final PipeConnectorSubtaskExecutor executor;
- protected final PipeParameters connectorAttributes;
+ protected final PipeParameters pipeConnectorParameters;
- protected PipeStatus status = null;
protected String connectorSubtaskId = null;
- protected boolean hasBeenExternallyStopped = false;
- protected PipeTaskConnectorStage(
- PipeConnectorSubtaskExecutor executor, PipeParameters connectorAttributes) {
- this.executor = executor;
- this.connectorAttributes = connectorAttributes;
+ public PipeTaskConnectorStage(PipeParameters pipeConnectorParameters) {
+ this.pipeConnectorParameters = pipeConnectorParameters;
}
@Override
- public synchronized void create() throws PipeException {
- if (status != null) {
- if (status == PipeStatus.RUNNING) {
- throw new PipeException(
- String.format("The PipeConnectorSubtask %s has been started", connectorSubtaskId));
- }
- if (status == PipeStatus.DROPPED) {
- throw new PipeException(
- String.format("The PipeConnectorSubtask %s has been dropped", connectorSubtaskId));
- }
- // status == PipeStatus.STOPPED
- if (hasBeenExternallyStopped) {
- throw new PipeException(
- String.format(
- "The PipeConnectorSubtask %s has been externally stopped", connectorSubtaskId));
- }
- // otherwise, do nothing to allow retry strategy
- return;
- }
-
- // status == null, register the connector
+ public void createSubtask() throws PipeException {
connectorSubtaskId =
- PipeConnectorSubtaskManager.instance().register(executor, connectorAttributes);
- status = PipeStatus.STOPPED;
+ PipeConnectorSubtaskManager.instance()
+ .register(
+ PipeSubtaskExecutorManager.getInstance().getConnectorSubtaskExecutor(),
+ pipeConnectorParameters);
}
@Override
- public synchronized void start() throws PipeException {
- if (status == null) {
- throw new PipeException(
- String.format("The PipeConnectorSubtask %s has not been created", connectorSubtaskId));
- }
- if (status == PipeStatus.RUNNING) {
- // do nothing to allow retry strategy
- return;
- }
- if (status == PipeStatus.DROPPED) {
- throw new PipeException(
- String.format("The PipeConnectorSubtask %s has been dropped", connectorSubtaskId));
- }
-
- // status == PipeStatus.STOPPED, start the connector
+ public void startSubtask() throws PipeException {
PipeConnectorSubtaskManager.instance().start(connectorSubtaskId);
- status = PipeStatus.RUNNING;
}
@Override
- public synchronized void stop() throws PipeException {
- if (status == null) {
- throw new PipeException(
- String.format("The PipeConnectorSubtask %s has not been created", connectorSubtaskId));
- }
- if (status == PipeStatus.STOPPED) {
- // do nothing to allow retry strategy
- return;
- }
- if (status == PipeStatus.DROPPED) {
- throw new PipeException(
- String.format("The PipeConnectorSubtask %s has been dropped", connectorSubtaskId));
- }
-
- // status == PipeStatus.RUNNING, stop the connector
+ public void stopSubtask() throws PipeException {
PipeConnectorSubtaskManager.instance().stop(connectorSubtaskId);
- status = PipeStatus.STOPPED;
- hasBeenExternallyStopped = true;
}
@Override
- public synchronized void drop() throws PipeException {
- if (status == null) {
- throw new PipeException(
- String.format("The PipeConnectorSubtask %s has not been created", connectorSubtaskId));
- }
- if (status == PipeStatus.DROPPED) {
- // do nothing to allow retry strategy
- return;
- }
-
- // status == PipeStatus.RUNNING or PipeStatus.STOPPED, drop the connector
+ public void dropSubtask() throws PipeException {
PipeConnectorSubtaskManager.instance().deregister(connectorSubtaskId);
- status = PipeStatus.DROPPED;
}
- @Override
- public PipeSubtask getSubtask() {
- return PipeConnectorSubtaskManager.instance().getPipeConnectorSubtask(connectorSubtaskId);
+ public ListenableBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
+ return PipeConnectorSubtaskManager.instance().getPipeConnectorPendingQueue(connectorSubtaskId);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index e7345be1e1..77bc2de5f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -19,45 +19,114 @@
package org.apache.iotdb.db.pipe.task.stage;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
+import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenablePendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
-import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
-public class PipeTaskProcessorStage implements PipeTaskStage {
+import javax.annotation.Nullable;
- protected final PipeSubtaskExecutor executor;
- protected final PipeSubtask subtask;
+public class PipeTaskProcessorStage extends PipeTaskStage {
- protected PipeTaskProcessorStage(
- PipeProcessorSubtaskExecutor executor, PipeProcessorSubtask subtask) {
- this.executor = executor;
- this.subtask = subtask;
+ protected final PipeProcessorSubtaskExecutor executor =
+ PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor();
+
+ protected final PipeProcessorSubtask subtask;
+
+ protected final ListenablePendingQueue<Event> pipeCollectorInputPendingQueue;
+ protected final ListenablePendingQueue<Event> pipeConnectorOutputPendingQueue;
+
+ /**
+ * @param pipeName pipe name
+ * @param dataRegionId data region id
+ * @param pipeCollectorInputEventSupplier used to input events from pipe collector
+ * @param pipeCollectorInputPendingQueue used to listen whether pipe collector event queue is from
+ * empty to not empty or from not empty to empty, null means no need to listen
+ * @param pipeProcessorParameters used to create pipe processor
+ * @param pipeConnectorOutputPendingQueue used to output events to pipe connector
+ */
+ public PipeTaskProcessorStage(
+ String pipeName,
+ String dataRegionId,
+ EventSupplier pipeCollectorInputEventSupplier,
+ @Nullable ListenablePendingQueue<Event> pipeCollectorInputPendingQueue,
+ PipeParameters pipeProcessorParameters,
+ ListenableBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
+ final String taskId = pipeName + "_" + dataRegionId;
+ final PipeProcessor pipeProcessor =
+ PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
+ final PipeEventCollector pipeConnectorOutputEventCollector =
+ new PipeEventCollector(pipeConnectorOutputPendingQueue);
+
+ this.subtask =
+ new PipeProcessorSubtask(
+ taskId,
+ pipeCollectorInputEventSupplier,
+ pipeProcessor,
+ pipeConnectorOutputEventCollector);
+
+ this.pipeCollectorInputPendingQueue =
+ pipeCollectorInputPendingQueue != null
+ ? pipeCollectorInputPendingQueue
+ .registerEmptyToNotEmptyListener(
+ taskId,
+ () -> {
+ if (status == PipeStatus.RUNNING) {
+ executor.start(subtask.getTaskID());
+ }
+ })
+ .registerNotEmptyToEmptyListener(taskId, () -> executor.stop(subtask.getTaskID()))
+ : null;
+ this.pipeConnectorOutputPendingQueue =
+ pipeConnectorOutputPendingQueue
+ .registerNotFullToFullListener(taskId, () -> executor.stop(subtask.getTaskID()))
+ .registerFullToNotFullListener(
+ taskId,
+ () -> {
+ // only start when the pipe is running
+ if (status == PipeStatus.RUNNING) {
+ pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
+ executor.start(subtask.getTaskID());
+ }
+ });
}
@Override
- public void create() throws PipeException {
+ public void createSubtask() throws PipeException {
executor.register(subtask);
}
@Override
- public void start() throws PipeException {
+ public void startSubtask() throws PipeException {
executor.start(subtask.getTaskID());
}
@Override
- public void stop() throws PipeException {
+ public void stopSubtask() throws PipeException {
executor.stop(subtask.getTaskID());
}
@Override
- public void drop() throws PipeException {
- executor.deregister(subtask.getTaskID());
- }
+ public void dropSubtask() throws PipeException {
+ final String taskId = subtask.getTaskID();
- @Override
- public PipeSubtask getSubtask() {
- return subtask;
+ if (pipeCollectorInputPendingQueue != null) {
+ pipeCollectorInputPendingQueue.removeEmptyToNotEmptyListener(taskId);
+ pipeCollectorInputPendingQueue.removeNotEmptyToEmptyListener(taskId);
+ }
+
+ pipeConnectorOutputPendingQueue.removeNotFullToFullListener(taskId);
+ pipeConnectorOutputPendingQueue.removeFullToNotFullListener(taskId);
+
+ executor.deregister(taskId);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
index 5f57fbabcb..e3a793c0d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
@@ -19,42 +19,113 @@
package org.apache.iotdb.db.pipe.task.stage;
-import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.pipe.api.exception.PipeException;
-public interface PipeTaskStage {
+public abstract class PipeTaskStage {
+
+ protected PipeStatus status = null;
+ protected boolean hasBeenExternallyStopped = false;
/**
* Create a pipe task stage.
*
* @throws PipeException if failed to create a pipe task stage.
*/
- void create() throws PipeException;
+ public synchronized void create() {
+ if (status != null) {
+ if (status == PipeStatus.RUNNING) {
+ throw new PipeException("The PipeTaskStage has been started");
+ }
+ if (status == PipeStatus.DROPPED) {
+ throw new PipeException("The PipeTaskStage has been dropped");
+ }
+ // status == PipeStatus.STOPPED
+ if (hasBeenExternallyStopped) {
+ throw new PipeException("The PipeTaskStage has been externally stopped");
+ }
+ // otherwise, do nothing to allow retry strategy
+ return;
+ }
+
+ // status == null, register the subtask
+ createSubtask();
+
+ status = PipeStatus.STOPPED;
+ }
+
+ protected abstract void createSubtask() throws PipeException;
+
/**
* Start a pipe task stage.
*
* @throws PipeException if failed to start a pipe task stage.
*/
- void start() throws PipeException;
+ public synchronized void start() {
+ if (status == null) {
+ throw new PipeException("The PipeTaskStage has not been created");
+ }
+ if (status == PipeStatus.RUNNING) {
+ // do nothing to allow retry strategy
+ return;
+ }
+ if (status == PipeStatus.DROPPED) {
+ throw new PipeException("The PipeTaskStage has been dropped");
+ }
+
+ // status == PipeStatus.STOPPED, start the subtask
+ startSubtask();
+
+ status = PipeStatus.RUNNING;
+ }
+
+ protected abstract void startSubtask() throws PipeException;
/**
* Stop a pipe task stage.
*
* @throws PipeException if failed to stop a pipe task stage.
*/
- void stop() throws PipeException;
+ public synchronized void stop() {
+ if (status == null) {
+ throw new PipeException("The PipeTaskStage has not been created");
+ }
+ if (status == PipeStatus.STOPPED) {
+ // do nothing to allow retry strategy
+ return;
+ }
+ if (status == PipeStatus.DROPPED) {
+ throw new PipeException("The PipeTaskStage has been dropped");
+ }
+
+ // status == PipeStatus.RUNNING, stop the connector
+ stopSubtask();
+
+ status = PipeStatus.STOPPED;
+ hasBeenExternallyStopped = true;
+ }
+
+ protected abstract void stopSubtask() throws PipeException;
/**
* Drop a pipe task stage.
*
* @throws PipeException if failed to drop a pipe task stage.
*/
- void drop() throws PipeException;
+ public synchronized void drop() {
+ if (status == null) {
+ throw new PipeException("The PipeTaskStage has not been created");
+ }
+ if (status == PipeStatus.DROPPED) {
+ // do nothing to allow retry strategy
+ return;
+ }
- /**
- * Get the pipe subtask.
- *
- * @return the pipe subtask.
- */
- PipeSubtask getSubtask();
+ // status == PipeStatus.RUNNING or PipeStatus.STOPPED, drop the connector
+ dropSubtask();
+
+ status = PipeStatus.DROPPED;
+ }
+
+ protected abstract void dropSubtask() throws PipeException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index b420d9e51a..e2ceb64480 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -19,55 +19,60 @@
package org.apache.iotdb.db.pipe.task.subtask;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ArrayBlockingQueue;
-
public class PipeConnectorSubtask extends PipeSubtask {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeConnectorSubtask.class);
- // input
- private final ArrayBlockingQueue<Event> pendingQueue;
- // output
- private final PipeConnector pipeConnector;
+ private final ListenableBlockingPendingQueue<Event> inputPendingQueue;
+ private final PipeConnector outputPipeConnector;
/** @param taskID connectorAttributeSortedString */
- public PipeConnectorSubtask(String taskID, PipeConnector pipeConnector) {
+ public PipeConnectorSubtask(
+ String taskID,
+ ListenableBlockingPendingQueue<Event> inputPendingQueue,
+ PipeConnector outputPipeConnector) {
super(taskID);
- // TODO: make the size of the queue size reasonable and configurable
- this.pendingQueue = new ArrayBlockingQueue<>(1024 * 1024);
- this.pipeConnector = pipeConnector;
- }
-
- public ArrayBlockingQueue<Event> getInputPendingQueue() {
- return pendingQueue;
+ this.inputPendingQueue = inputPendingQueue;
+ this.outputPipeConnector = outputPipeConnector;
}
// TODO: for a while
@Override
protected void executeForAWhile() {
- if (pendingQueue.isEmpty()) {
- return;
+ try {
+ // TODO: reduce the frequency of heartbeat
+ outputPipeConnector.heartbeat();
+ } catch (Exception e) {
+ throw new PipeConnectionException(
+ "PipeConnector: failed to connect to the target system.", e);
}
- final Event event = pendingQueue.poll();
+ final Event event = inputPendingQueue.poll();
+ if (event == null) {
+ return;
+ }
try {
if (event instanceof TabletInsertionEvent) {
- pipeConnector.transfer((TabletInsertionEvent) event);
+ outputPipeConnector.transfer((TabletInsertionEvent) event);
} else if (event instanceof TsFileInsertionEvent) {
- pipeConnector.transfer((TsFileInsertionEvent) event);
+ outputPipeConnector.transfer((TsFileInsertionEvent) event);
} else if (event instanceof DeletionEvent) {
- pipeConnector.transfer((DeletionEvent) event);
+ outputPipeConnector.transfer((DeletionEvent) event);
} else {
throw new RuntimeException("Unsupported event type: " + event.getClass().getName());
}
@@ -79,10 +84,50 @@ public class PipeConnectorSubtask extends PipeSubtask {
}
}
+ @Override
+ public void onFailure(@NotNull Throwable throwable) {
+ // retry to connect to the target system if the connection is broken
+ if (throwable instanceof PipeConnectionException) {
+ int retry = 0;
+ while (retry < MAX_RETRY_TIMES) {
+ try {
+ outputPipeConnector.handshake();
+ break;
+ } catch (Exception e) {
+ retry++;
+ LOGGER.error("Failed to reconnect to the target system, retrying... ({} time(s))", retry);
+ try {
+ // TODO: make the retry interval configurable
+ Thread.sleep(retry * 1000L);
+ } catch (InterruptedException interruptedException) {
+ LOGGER.info(
+ "Interrupted while sleeping, perhaps need to check whether the thread is interrupted.");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ // stop current pipe task if failed to reconnect to the target system after MAX_RETRY_TIMES
+ // times
+ if (retry == MAX_RETRY_TIMES) {
+ LOGGER.error(
+ "Failed to reconnect to the target system after {} times, stopping current pipe task {}...",
+ MAX_RETRY_TIMES,
+ taskID);
+ lastFailedCause = throwable;
+ PipeAgent.runtime().report(this);
+ return;
+ }
+ }
+
+ // handle other exceptions as usual
+ super.onFailure(throwable);
+ }
+
@Override
public void close() {
try {
- pipeConnector.close();
+ outputPipeConnector.close();
} catch (Exception e) {
e.printStackTrace();
LOGGER.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index 34f8045cb6..3b7a59aa9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.task.subtask;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
@@ -30,34 +32,36 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ArrayBlockingQueue;
-
public class PipeProcessorSubtask extends PipeSubtask {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeProcessorSubtask.class);
- private final ArrayBlockingQueue<Event> pendingEventQueue;
+ private final EventSupplier inputEventSupplier;
private final PipeProcessor pipeProcessor;
private final EventCollector outputEventCollector;
public PipeProcessorSubtask(
String taskID,
- ArrayBlockingQueue<Event> pendingEventQueue,
+ EventSupplier inputEventSupplier,
PipeProcessor pipeProcessor,
EventCollector outputEventCollector) {
super(taskID);
+ this.inputEventSupplier = inputEventSupplier;
this.pipeProcessor = pipeProcessor;
- this.pendingEventQueue = pendingEventQueue;
this.outputEventCollector = outputEventCollector;
}
@Override
- protected void executeForAWhile() {
- if (pendingEventQueue.isEmpty()) {
+ protected void executeForAWhile() throws Exception {
+ Event event = inputEventSupplier.supply();
+ if (event == null) {
return;
}
- final Event event = pendingEventQueue.poll();
+ if (event instanceof PipeRealtimeCollectEvent) {
+ // dispatch the event
+ event = ((PipeRealtimeCollectEvent) event).getEvent();
+ }
try {
if (event instanceof TabletInsertionEvent) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index 45b0331700..e01c139018 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -38,17 +38,17 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
private static final Logger LOGGER = LoggerFactory.getLogger(PipeSubtask.class);
- private final String taskID;
+ protected final String taskID;
private ListeningExecutorService subtaskWorkerThreadPoolExecutor;
private ExecutorService subtaskCallbackListeningExecutor;
private final DecoratingLock callbackDecoratingLock = new DecoratingLock();
- private static final int MAX_RETRY_TIMES = 5;
+ protected static final int MAX_RETRY_TIMES = 5;
private final AtomicInteger retryCount = new AtomicInteger(0);
- private Throwable lastFailedCause;
+ protected Throwable lastFailedCause;
private final AtomicBoolean shouldStopSubmittingSelf = new AtomicBoolean(true);
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index c453ee6af3..f376feb49c 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -70,7 +70,7 @@ public class CachedSchemaPatternMatcherTest {
new HashMap<String, String>() {
{
put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root");
- put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, "1");
+ put(PipeCollectorConstant.DATA_REGION_KEY, "1");
}
}),
null);
@@ -86,7 +86,7 @@ public class CachedSchemaPatternMatcherTest {
new HashMap<String, String>() {
{
put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root." + finalI1);
- put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, "1");
+ put(PipeCollectorConstant.DATA_REGION_KEY, "1");
}
}),
null);
@@ -100,7 +100,7 @@ public class CachedSchemaPatternMatcherTest {
new HashMap<String, String>() {
{
put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root." + finalI + "." + finalJ);
- put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, "1");
+ put(PipeCollectorConstant.DATA_REGION_KEY, "1");
}
}),
null);
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index f218e6865f..214b8b0774 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.EventType;
@@ -83,20 +84,20 @@ public class PipeRealtimeCollectTest {
// set up realtime collector
try (PipeRealtimeDataRegionHybridCollector collector1 =
- new PipeRealtimeDataRegionHybridCollector();
+ new PipeRealtimeDataRegionHybridCollector(new ListenableUnblockingPendingQueue<>());
PipeRealtimeDataRegionHybridCollector collector2 =
- new PipeRealtimeDataRegionHybridCollector();
+ new PipeRealtimeDataRegionHybridCollector(new ListenableUnblockingPendingQueue<>());
PipeRealtimeDataRegionHybridCollector collector3 =
- new PipeRealtimeDataRegionHybridCollector();
+ new PipeRealtimeDataRegionHybridCollector(new ListenableUnblockingPendingQueue<>());
PipeRealtimeDataRegionHybridCollector collector4 =
- new PipeRealtimeDataRegionHybridCollector()) {
+ new PipeRealtimeDataRegionHybridCollector(new ListenableUnblockingPendingQueue<>())) {
collector1.customize(
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern1);
- put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion1);
+ put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion1);
}
}),
null);
@@ -105,7 +106,7 @@ public class PipeRealtimeCollectTest {
new HashMap<String, String>() {
{
put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern2);
- put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion1);
+ put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion1);
}
}),
null);
@@ -114,7 +115,7 @@ public class PipeRealtimeCollectTest {
new HashMap<String, String>() {
{
put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern1);
- put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion2);
+ put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion2);
}
}),
null);
@@ -123,7 +124,7 @@ public class PipeRealtimeCollectTest {
new HashMap<String, String>() {
{
put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern2);
- put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion2);
+ put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion2);
}
}),
null);
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
index 6fbc7fe2c6..52acbe9c2f 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.execution.executor;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -36,9 +37,8 @@ public class PipeConnectorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
subtask =
Mockito.spy(
new PipeConnectorSubtask(
- "PipeConnectorSubtaskExecutorTest", mock(PipeConnector.class)) {
- @Override
- public void executeForAWhile() {}
- });
+ "PipeConnectorSubtaskExecutorTest",
+ mock(ListenableBlockingPendingQueue.class),
+ mock(PipeConnector.class)));
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
index 1abf2ab05e..d0a5208d53 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.execution.executor;
+import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -26,8 +27,6 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.junit.Before;
import org.mockito.Mockito;
-import java.util.concurrent.ArrayBlockingQueue;
-
import static org.mockito.Mockito.mock;
public class PipeProcessorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
@@ -40,11 +39,8 @@ public class PipeProcessorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
Mockito.spy(
new PipeProcessorSubtask(
"PipeProcessorSubtaskExecutorTest",
- mock(ArrayBlockingQueue.class),
+ mock(EventSupplier.class),
mock(PipeProcessor.class),
- mock(EventCollector.class)) {
- @Override
- public void executeForAWhile() {}
- });
+ mock(EventCollector.class)));
}
}