You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/14 19:24:25 UTC
[iotdb] branch master updated: [IOTDB-5873] Pipe: Support `CREATE`, `DROP`, `START`, `STOP` & `SHOW` Clauses (#9849)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 803c87fcdb0 [IOTDB-5873] Pipe: Support `CREATE`, `DROP`, `START`, `STOP` & `SHOW` Clauses (#9849)
803c87fcdb0 is described below
commit 803c87fcdb097292463eb23d387d7032c275aacf
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 15 03:24:18 2023 +0800
[IOTDB-5873] Pipe: Support `CREATE`, `DROP`, `START`, `STOP` & `SHOW` Clauses (#9849)
---
.../response/pipe/task/PipeTableResp.java | 76 ++++++++++++++++++-
.../iotdb/confignode/manager/ConfigManager.java | 11 +--
.../manager/pipe/PipeTaskCoordinator.java | 11 ++-
.../persistence/pipe/PipePluginInfo.java | 49 +++++++++++-
.../runtime/PipeHandleLeaderChangeProcedure.java | 4 +-
.../pipe/task/AbstractOperatePipeProcedureV2.java | 2 +-
.../impl/pipe/task/CreatePipeProcedureV2.java | 16 ++--
.../procedure/store/ProcedureFactory.java | 10 +++
.../iotdb/commons/pipe/task/meta/PipeMeta.java | 9 +--
.../commons/pipe/task/meta/PipeRuntimeMeta.java | 27 ++++++-
.../commons/pipe/task/meta/PipeStaticMeta.java | 88 ++++++++++++++++------
.../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 27 ++++++-
.../commons/pipe/task/meta/PipeMetaDeSerTest.java | 77 +++++++++++++++++++
.../config/executor/ClusterConfigTaskExecutor.java | 5 +-
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 12 ++-
.../db/pipe/config/PipeCollectorConstant.java | 5 +-
.../realtime/PipeRealtimeDataRegionCollector.java | 6 +-
.../connector/PipeConnectorSubtaskManager.java | 23 +++++-
.../db/pipe/task/stage/PipeTaskCollectorStage.java | 25 ++++--
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 44 ++++++++---
.../impl/DataNodeInternalRPCServiceImpl.java | 6 +-
.../collector/CachedSchemaPatternMatcherTest.java | 8 +-
.../core/collector/PipeRealtimeCollectTest.java | 8 +-
23 files changed, 458 insertions(+), 91 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index a0416ede651..2d6f401eefb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -21,7 +21,12 @@ package org.apache.iotdb.confignode.consensus.response.pipe.task;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.consensus.common.DataSet;
import java.io.IOException;
@@ -39,11 +44,80 @@ public class PipeTableResp implements DataSet {
this.allPipeMeta = allPipeMeta;
}
- public TGetAllPipeInfoResp convertToThriftResponse() throws IOException {
+ public PipeTableResp filter(Boolean whereClause, String pipeName) {
+ if (whereClause == null) {
+ if (pipeName == null) {
+ return this;
+ } else {
+ final List<PipeMeta> filteredPipeMeta = new ArrayList<>();
+ for (PipeMeta pipeMeta : allPipeMeta) {
+ if (pipeMeta.getStaticMeta().getPipeName().equals(pipeName)) {
+ filteredPipeMeta.add(pipeMeta);
+ break;
+ }
+ }
+ return new PipeTableResp(status, filteredPipeMeta);
+ }
+ } else {
+ if (pipeName == null) {
+ return this;
+ } else {
+ String sortedConnectorParametersString = null;
+ for (PipeMeta pipeMeta : allPipeMeta) {
+ if (pipeMeta.getStaticMeta().getPipeName().equals(pipeName)) {
+ sortedConnectorParametersString =
+ pipeMeta.getStaticMeta().getConnectorParameters().toString();
+ break;
+ }
+ }
+
+ final List<PipeMeta> filteredPipeMeta = new ArrayList<>();
+ for (PipeMeta pipeMeta : allPipeMeta) {
+ if (pipeMeta
+ .getStaticMeta()
+ .getConnectorParameters()
+ .toString()
+ .equals(sortedConnectorParametersString)) {
+ filteredPipeMeta.add(pipeMeta);
+ }
+ }
+ return new PipeTableResp(status, filteredPipeMeta);
+ }
+ }
+ }
+
+ public TGetAllPipeInfoResp convertToTGetAllPipeInfoResp() throws IOException {
final List<ByteBuffer> pipeInformationByteBuffers = new ArrayList<>();
for (PipeMeta pipeMeta : allPipeMeta) {
pipeInformationByteBuffers.add(pipeMeta.serialize());
}
return new TGetAllPipeInfoResp(status, pipeInformationByteBuffers);
}
+
+ public TShowPipeResp convertToTShowPipeResp() {
+ final List<TShowPipeInfo> showPipeInfoList = new ArrayList<>();
+
+ for (PipeMeta pipeMeta : allPipeMeta) {
+ final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
+ final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
+ final StringBuilder exceptionMessageBuilder = new StringBuilder();
+ for (PipeTaskMeta pipeTaskMeta : runtimeMeta.getConsensusGroupIdToTaskMetaMap().values()) {
+ for (Exception e : pipeTaskMeta.getExceptionMessages()) {
+ exceptionMessageBuilder.append(e.getMessage()).append("\n");
+ }
+ }
+
+ showPipeInfoList.add(
+ new TShowPipeInfo(
+ staticMeta.getPipeName(),
+ staticMeta.getCreationTime(),
+ runtimeMeta.getStatus().get().name(),
+ staticMeta.getCollectorParameters().toString(),
+ staticMeta.getProcessorParameters().toString(),
+ staticMeta.getConnectorParameters().toString(),
+ exceptionMessageBuilder.toString()));
+ }
+
+ return new TShowPipeResp().setStatus(status).setPipeInfoList(showPipeInfoList);
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 4d9a4cdc8c8..a6877c1bdca 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1612,14 +1612,9 @@ public class ConfigManager implements IManager {
@Override
public TShowPipeResp showPipe(TShowPipeReq req) {
TSStatus status = confirmLeader();
- LOGGER.info("showPipe: {}", req);
- TShowPipeResp resp = new TShowPipeResp();
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- // TODO: Implement PipeManager
- return resp.setStatus(status);
- } else {
- return resp.setStatus(status);
- }
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? pipeManager.getPipeTaskCoordinator().showPipes(req)
+ : new TShowPipeResp().setStatus(status);
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
index c99bd7d1a04..e01f4b785bc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -78,7 +80,7 @@ public class PipeTaskCoordinator {
try {
return ((PipeTableResp)
configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset())
- .convertToThriftResponse();
+ .convertToTGetAllPipeInfoResp();
} catch (IOException e) {
LOGGER.error("Fail to get AllPipeInfo", e);
return new TGetAllPipeInfoResp(
@@ -88,6 +90,13 @@ public class PipeTaskCoordinator {
}
}
+ public TShowPipeResp showPipes(TShowPipeReq req) {
+ return ((PipeTableResp)
+ configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset())
+ .filter(req.whereClause, req.pipeName)
+ .convertToTShowPipeResp();
+ }
+
public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
throw new UnsupportedOperationException("Not implemented yet");
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 3818070daca..c954df5770e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -34,6 +34,10 @@ import org.apache.iotdb.confignode.consensus.response.pipe.plugin.PipePluginTabl
import org.apache.iotdb.confignode.consensus.response.udf.JarResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeManagementException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -52,6 +56,9 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
+import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.DEFAULT_COLLECTOR;
+import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.DO_NOTHING_PROCESSOR;
+
public class PipePluginInfo implements SnapshotProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(PipePluginInfo.class);
@@ -115,8 +122,46 @@ public class PipePluginInfo implements SnapshotProcessor {
}
public boolean checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) {
- // TODO: validate the plugins in the create pipe Req.
- throw new UnsupportedOperationException("Not implemented yet");
+ final PipeParameters collectorParameters =
+ new PipeParameters(createPipeRequest.getCollectorAttributes());
+ final String collectorPluginName =
+ collectorParameters.getStringOrDefault(
+ PipeCollectorConstant.COLLECTOR_KEY, DEFAULT_COLLECTOR.getPipePluginName());
+ if (!pipePluginMetaKeeper.containsPipePlugin(collectorPluginName)) {
+ LOGGER.warn(
+ "Failed to create pipe, the pipe collector plugin {} does not exist",
+ collectorPluginName);
+ return false;
+ }
+
+ final PipeParameters processorParameters =
+ new PipeParameters(createPipeRequest.getProcessorAttributes());
+ final String processorPluginName =
+ processorParameters.getStringOrDefault(
+ PipeProcessorConstant.PROCESSOR_KEY, DO_NOTHING_PROCESSOR.getPipePluginName());
+ if (!pipePluginMetaKeeper.containsPipePlugin(processorPluginName)) {
+ LOGGER.warn(
+ "Failed to create pipe, the pipe processor plugin {} does not exist",
+ processorPluginName);
+ return false;
+ }
+
+ final PipeParameters connectorParameters =
+ new PipeParameters(createPipeRequest.getConnectorAttributes());
+ if (!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) {
+ LOGGER.warn("Failed to create pipe, the pipe connector plugin is not specified");
+ return false;
+ }
+ final String connectorPluginName =
+ connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY);
+ if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) {
+ LOGGER.warn(
+ "Failed to create pipe, the pipe connector plugin {} does not exist",
+ connectorPluginName);
+ return false;
+ }
+
+ return true;
}
/////////////////////////////// Pipe Plugin Management ///////////////////////////////
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index adc787f58f9..42de84cc9e5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -153,7 +153,7 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur
public void serialize(DataOutputStream stream) throws IOException {
stream.writeShort(ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE.getTypeCode());
super.serialize(stream);
- stream.writeInt(dataRegionGroupToOldAndNewLeaderPairMap.size());
+ ReadWriteIOUtils.write(dataRegionGroupToOldAndNewLeaderPairMap.size(), stream);
for (Map.Entry<TConsensusGroupId, Pair<Integer, Integer>> entry :
dataRegionGroupToOldAndNewLeaderPairMap.entrySet()) {
ReadWriteIOUtils.write(entry.getKey().getId(), stream);
@@ -165,7 +165,7 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur
@Override
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
- final int size = byteBuffer.getInt();
+ final int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
final int dataRegionGroupId = ReadWriteIOUtils.readInt(byteBuffer);
final int oldDataRegionLeaderId = ReadWriteIOUtils.readInt(byteBuffer);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
index 15b36ecc8dd..402a9d43bf3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
@@ -208,7 +208,7 @@ public abstract class AbstractOperatePipeProcedureV2
@Override
public void serialize(DataOutputStream stream) throws IOException {
super.serialize(stream);
- stream.writeBoolean(isRollbackFromOperateOnDataNodesSuccessful);
+ ReadWriteIOUtils.write(isRollbackFromOperateOnDataNodesSuccessful, stream);
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index f3bc82dd725..ec45d1c26b4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -180,26 +180,26 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
stream.writeShort(ProcedureType.CREATE_PIPE_PROCEDURE_V2.getTypeCode());
super.serialize(stream);
ReadWriteIOUtils.write(createPipeRequest.getPipeName(), stream);
- stream.writeInt(createPipeRequest.getCollectorAttributesSize());
+ ReadWriteIOUtils.write(createPipeRequest.getCollectorAttributesSize(), stream);
for (Map.Entry<String, String> entry : createPipeRequest.getCollectorAttributes().entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), stream);
ReadWriteIOUtils.write(entry.getValue(), stream);
}
- stream.writeInt(createPipeRequest.getProcessorAttributesSize());
+ ReadWriteIOUtils.write(createPipeRequest.getProcessorAttributesSize(), stream);
for (Map.Entry<String, String> entry : createPipeRequest.getProcessorAttributes().entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), stream);
ReadWriteIOUtils.write(entry.getValue(), stream);
}
- stream.writeInt(createPipeRequest.getConnectorAttributesSize());
+ ReadWriteIOUtils.write(createPipeRequest.getConnectorAttributesSize(), stream);
for (Map.Entry<String, String> entry : createPipeRequest.getConnectorAttributes().entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), stream);
ReadWriteIOUtils.write(entry.getValue(), stream);
}
if (pipeStaticMeta != null) {
- stream.writeBoolean(true);
+ ReadWriteIOUtils.write(true, stream);
pipeStaticMeta.serialize(stream);
} else {
- stream.writeBoolean(false);
+ ReadWriteIOUtils.write(false, stream);
}
}
@@ -212,19 +212,19 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
.setCollectorAttributes(new HashMap<>())
.setProcessorAttributes(new HashMap<>())
.setConnectorAttributes(new HashMap<>());
- int size = byteBuffer.getInt();
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
createPipeRequest
.getCollectorAttributes()
.put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
}
- size = byteBuffer.getInt();
+ size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
createPipeRequest
.getProcessorAttributes()
.put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
}
- size = byteBuffer.getInt();
+ size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
createPipeRequest
.getConnectorAttributes()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 724a86f19cb..a24475fc8c9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -200,6 +200,16 @@ public class ProcedureFactory implements IProcedureFactory {
return ProcedureType.CREATE_MODEL_PROCEDURE;
} else if (procedure instanceof DropModelProcedure) {
return ProcedureType.DROP_MODEL_PROCEDURE;
+ } else if (procedure instanceof CreatePipeProcedureV2) {
+ return ProcedureType.CREATE_PIPE_PROCEDURE_V2;
+ } else if (procedure instanceof StartPipeProcedureV2) {
+ return ProcedureType.START_PIPE_PROCEDURE_V2;
+ } else if (procedure instanceof StopPipeProcedureV2) {
+ return ProcedureType.STOP_PIPE_PROCEDURE_V2;
+ } else if (procedure instanceof DropPipeProcedureV2) {
+ return ProcedureType.DROP_PIPE_PROCEDURE_V2;
+ } else if (procedure instanceof PipeHandleLeaderChangeProcedure) {
+ return ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE;
}
return null;
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
index 1fa610cc35c..8b15b0ffa9c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.commons.pipe.task.meta;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.FileInputStream;
@@ -55,13 +54,13 @@ public class PipeMeta {
}
public void serialize(DataOutputStream outputStream) throws IOException {
- ReadWriteIOUtils.write(staticMeta.serialize(), outputStream);
- ReadWriteIOUtils.write(runtimeMeta.serialize(), outputStream);
+ staticMeta.serialize(outputStream);
+ runtimeMeta.serialize(outputStream);
}
public void serialize(FileOutputStream outputStream) throws IOException {
- ReadWriteIOUtils.write(staticMeta.serialize(), outputStream);
- ReadWriteIOUtils.write(runtimeMeta.serialize(), outputStream);
+ staticMeta.serialize(outputStream);
+ runtimeMeta.serialize(outputStream);
}
public static PipeMeta deserialize(FileInputStream fileInputStream) throws IOException {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
index 683177573ad..54f664c3953 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@@ -74,9 +75,31 @@ public class PipeRuntimeMeta {
}
}
+ public void serialize(FileOutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(status.get().getType(), outputStream);
+
+ ReadWriteIOUtils.write(consensusGroupIdToTaskMetaMap.size(), outputStream);
+ for (Map.Entry<TConsensusGroupId, PipeTaskMeta> entry :
+ consensusGroupIdToTaskMetaMap.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey().getId(), outputStream);
+ entry.getValue().serialize(outputStream);
+ }
+ }
+
public static PipeRuntimeMeta deserialize(InputStream inputStream) throws IOException {
- return deserialize(
- ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
+ final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
+
+ pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(inputStream)));
+
+ final int size = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < size; ++i) {
+ pipeRuntimeMeta.consensusGroupIdToTaskMetaMap.put(
+ new TConsensusGroupId(
+ TConsensusGroupType.DataRegion, ReadWriteIOUtils.readInt(inputStream)),
+ PipeTaskMeta.deserialize(inputStream));
+ }
+
+ return pipeRuntimeMeta;
}
public static PipeRuntimeMeta deserialize(ByteBuffer byteBuffer) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index f487de445cb..df8be6538a2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@@ -47,7 +48,7 @@ public class PipeStaticMeta {
Map<String, String> collectorAttributes,
Map<String, String> processorAttributes,
Map<String, String> connectorAttributes) {
- this.pipeName = pipeName.toUpperCase();
+ this.pipeName = pipeName;
this.creationTime = creationTime;
collectorParameters = new PipeParameters(collectorAttributes);
processorParameters = new PipeParameters(processorAttributes);
@@ -85,17 +86,38 @@ public class PipeStaticMeta {
ReadWriteIOUtils.write(pipeName, outputStream);
ReadWriteIOUtils.write(creationTime, outputStream);
- outputStream.writeInt(collectorParameters.getAttribute().size());
+ ReadWriteIOUtils.write(collectorParameters.getAttribute().size(), outputStream);
for (Map.Entry<String, String> entry : collectorParameters.getAttribute().entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
- outputStream.writeInt(processorParameters.getAttribute().size());
+ ReadWriteIOUtils.write(processorParameters.getAttribute().size(), outputStream);
for (Map.Entry<String, String> entry : processorParameters.getAttribute().entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
- outputStream.writeInt(connectorParameters.getAttribute().size());
+ ReadWriteIOUtils.write(connectorParameters.getAttribute().size(), outputStream);
+ for (Map.Entry<String, String> entry : connectorParameters.getAttribute().entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ }
+
+ public void serialize(FileOutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(pipeName, outputStream);
+ ReadWriteIOUtils.write(creationTime, outputStream);
+
+ ReadWriteIOUtils.write(collectorParameters.getAttribute().size(), outputStream);
+ for (Map.Entry<String, String> entry : collectorParameters.getAttribute().entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ ReadWriteIOUtils.write(processorParameters.getAttribute().size(), outputStream);
+ for (Map.Entry<String, String> entry : processorParameters.getAttribute().entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ ReadWriteIOUtils.write(connectorParameters.getAttribute().size(), outputStream);
for (Map.Entry<String, String> entry : connectorParameters.getAttribute().entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
@@ -103,8 +125,35 @@ public class PipeStaticMeta {
}
public static PipeStaticMeta deserialize(InputStream inputStream) throws IOException {
- return deserialize(
- ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
+ final PipeStaticMeta pipeStaticMeta = new PipeStaticMeta();
+
+ pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(inputStream);
+ pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(inputStream);
+
+ pipeStaticMeta.collectorParameters = new PipeParameters(new HashMap<>());
+ pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>());
+ pipeStaticMeta.connectorParameters = new PipeParameters(new HashMap<>());
+
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < size; ++i) {
+ final String key = ReadWriteIOUtils.readString(inputStream);
+ final String value = ReadWriteIOUtils.readString(inputStream);
+ pipeStaticMeta.collectorParameters.getAttribute().put(key, value);
+ }
+ size = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < size; ++i) {
+ final String key = ReadWriteIOUtils.readString(inputStream);
+ final String value = ReadWriteIOUtils.readString(inputStream);
+ pipeStaticMeta.processorParameters.getAttribute().put(key, value);
+ }
+ size = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < size; ++i) {
+ final String key = ReadWriteIOUtils.readString(inputStream);
+ final String value = ReadWriteIOUtils.readString(inputStream);
+ pipeStaticMeta.connectorParameters.getAttribute().put(key, value);
+ }
+
+ return pipeStaticMeta;
}
public static PipeStaticMeta deserialize(ByteBuffer byteBuffer) {
@@ -117,26 +166,23 @@ public class PipeStaticMeta {
pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>());
pipeStaticMeta.connectorParameters = new PipeParameters(new HashMap<>());
- int size = byteBuffer.getInt();
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
- pipeStaticMeta
- .collectorParameters
- .getAttribute()
- .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+ final String key = ReadWriteIOUtils.readString(byteBuffer);
+ final String value = ReadWriteIOUtils.readString(byteBuffer);
+ pipeStaticMeta.collectorParameters.getAttribute().put(key, value);
}
- size = byteBuffer.getInt();
+ size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
- pipeStaticMeta
- .processorParameters
- .getAttribute()
- .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+ final String key = ReadWriteIOUtils.readString(byteBuffer);
+ final String value = ReadWriteIOUtils.readString(byteBuffer);
+ pipeStaticMeta.processorParameters.getAttribute().put(key, value);
}
- size = byteBuffer.getInt();
+ size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
- pipeStaticMeta
- .connectorParameters
- .getAttribute()
- .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+ final String key = ReadWriteIOUtils.readString(byteBuffer);
+ final String value = ReadWriteIOUtils.readString(byteBuffer);
+ pipeStaticMeta.connectorParameters.getAttribute().put(key, value);
}
return pipeStaticMeta;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index 1131047c104..213c3da3a22 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@@ -93,6 +94,17 @@ public class PipeTaskMeta {
}
}
+ public void serialize(FileOutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(progressIndex.get(), outputStream);
+ ReadWriteIOUtils.write(regionLeader.get(), outputStream);
+ ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
+ for (final PipeRuntimeException exceptionMessage : exceptionMessages) {
+ ReadWriteIOUtils.write(
+ exceptionMessage instanceof PipeRuntimeCriticalException, outputStream);
+ ReadWriteIOUtils.write(exceptionMessage.getMessage(), outputStream);
+ }
+ }
+
public static PipeTaskMeta deserialize(ByteBuffer byteBuffer) {
final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta();
PipeTaskMeta.progressIndex.set(ReadWriteIOUtils.readLong(byteBuffer));
@@ -110,8 +122,19 @@ public class PipeTaskMeta {
}
public static PipeTaskMeta deserialize(InputStream inputStream) throws IOException {
- return deserialize(
- ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
+ final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta();
+ PipeTaskMeta.progressIndex.set(ReadWriteIOUtils.readLong(inputStream));
+ PipeTaskMeta.regionLeader.set(ReadWriteIOUtils.readInt(inputStream));
+ final int size = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < size; ++i) {
+ final boolean critical = ReadWriteIOUtils.readBool(inputStream);
+ final String message = ReadWriteIOUtils.readString(inputStream);
+ PipeTaskMeta.exceptionMessages.add(
+ critical
+ ? new PipeRuntimeCriticalException(message)
+ : new PipeRuntimeNonCriticalException(message));
+ }
+ return PipeTaskMeta;
}
@Override
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
new file mode 100644
index 00000000000..08e7f86007c
--- /dev/null
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.task.meta;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+public class PipeMetaDeSerTest {
+
+ @Test
+ public void test() throws IOException {
+ PipeStaticMeta pipeStaticMeta =
+ new PipeStaticMeta(
+ "pipeName",
+ 123L,
+ new HashMap() {
+ {
+ put("collector-key", "collector-value");
+ }
+ },
+ new HashMap() {
+ {
+ put("processor-key-1", "processor-value-1");
+ put("processor-key-2", "processor-value-2");
+ }
+ },
+ new HashMap() {});
+ ByteBuffer staticByteBuffer = pipeStaticMeta.serialize();
+ PipeStaticMeta pipeStaticMeta1 = PipeStaticMeta.deserialize(staticByteBuffer);
+ Assert.assertEquals(pipeStaticMeta, pipeStaticMeta1);
+
+ PipeRuntimeMeta pipeRuntimeMeta =
+ new PipeRuntimeMeta(
+ new HashMap() {
+ {
+ put(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
+ new PipeTaskMeta(789, 987));
+ put(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
+ new PipeTaskMeta(456, 789));
+ }
+ });
+ ByteBuffer runtimeByteBuffer = pipeRuntimeMeta.serialize();
+ PipeRuntimeMeta pipeRuntimeMeta1 = PipeRuntimeMeta.deserialize(runtimeByteBuffer);
+ Assert.assertEquals(pipeRuntimeMeta, pipeRuntimeMeta1);
+
+ PipeMeta pipeMeta = new PipeMeta(pipeStaticMeta, pipeRuntimeMeta);
+ ByteBuffer byteBuffer = pipeMeta.serialize();
+ PipeMeta pipeMeta1 = PipeMeta.deserialize(byteBuffer);
+ Assert.assertEquals(pipeMeta, pipeMeta1);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 3119043f3f6..5fb4f5b7b5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -85,7 +85,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowThrottleReq;
@@ -1645,8 +1644,8 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
if (showPipeStatement.getWhereClause()) {
tShowPipeReq.setWhereClause(true);
}
- TShowPipeResp resp = configNodeClient.showPipe(tShowPipeReq);
- List<TShowPipeInfo> tShowPipeInfoList = new ArrayList<>();
+ List<TShowPipeInfo> tShowPipeInfoList =
+ configNodeClient.showPipe(tShowPipeReq).getPipeInfoList();
ShowPipeTask.buildTSBlock(tShowPipeInfoList, future);
} catch (Exception e) {
future.setException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index bea8913d268..82b7b494f91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -3384,7 +3384,9 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
Map<String, String> collectorMap = new HashMap<>();
for (IoTDBSqlParser.CollectorAttributeClauseContext singleCtx :
ctx.collectorAttributeClause()) {
- collectorMap.put(singleCtx.collectorKey.getText(), singleCtx.collectorValue.getText());
+ collectorMap.put(
+ parseStringLiteral(singleCtx.collectorKey.getText()),
+ parseStringLiteral(singleCtx.collectorValue.getText()));
}
return collectorMap;
}
@@ -3394,7 +3396,9 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
Map<String, String> processorMap = new HashMap<>();
for (IoTDBSqlParser.ProcessorAttributeClauseContext singleCtx :
ctx.processorAttributeClause()) {
- processorMap.put(singleCtx.processorKey.getText(), singleCtx.processorValue.getText());
+ processorMap.put(
+ parseStringLiteral(singleCtx.processorKey.getText()),
+ parseStringLiteral(singleCtx.processorValue.getText()));
}
return processorMap;
}
@@ -3404,7 +3408,9 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
Map<String, String> connectorMap = new HashMap<>();
for (IoTDBSqlParser.ConnectorAttributeClauseContext singleCtx :
ctx.connectorAttributeClause()) {
- connectorMap.put(singleCtx.connectorKey.getText(), singleCtx.connectorValue.getText());
+ connectorMap.put(
+ parseStringLiteral(singleCtx.connectorKey.getText()),
+ parseStringLiteral(singleCtx.connectorValue.getText()));
}
return connectorMap;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
index 5906de3a49a..06485b8cc2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
@@ -22,7 +22,10 @@ package org.apache.iotdb.db.pipe.config;
public class PipeCollectorConstant {
public static final String COLLECTOR_KEY = "collector";
- public static final String PATTERN_PATTERN_KEY = "collector.pattern";
+
+ public static final String COLLECTOR_PATTERN_KEY = "collector.pattern";
+ public static final String COLLECTOR_PATTERN_DEFAULT_VALUE = "root";
+
public static final String DATA_REGION_KEY = "collector.data-region";
private PipeCollectorConstant() {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
index 08e731df136..41e9f5d76fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -34,14 +34,16 @@ public abstract class PipeRealtimeDataRegionCollector implements PipeCollector {
@Override
public void validate(PipeParameterValidator validator) throws Exception {
- validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_PATTERN_KEY);
validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
}
@Override
public void customize(
PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
- pattern = parameters.getString(PipeCollectorConstant.PATTERN_PATTERN_KEY);
+ pattern =
+ parameters.getStringOrDefault(
+ PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
+ PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE);
dataRegionId = parameters.getString(PipeCollectorConstant.DATA_REGION_KEY);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
index 070e4e05b2c..33a298c5d02 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
@@ -24,9 +24,12 @@ import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
import java.util.HashMap;
import java.util.Map;
@@ -38,13 +41,27 @@ public class PipeConnectorSubtaskManager {
attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
public synchronized String register(
- PipeConnectorSubtaskExecutor executor, PipeParameters connectorAttributes) {
+ PipeConnectorSubtaskExecutor executor, PipeParameters pipeConnectorParameters) {
final String attributeSortedString =
- new TreeMap<>(connectorAttributes.getAttribute()).toString();
+ new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
- final PipeConnector pipeConnector = PipeAgent.plugin().reflectConnector(connectorAttributes);
+ // 1. construct, validate and customize PipeConnector
+ final PipeConnector pipeConnector =
+ PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
+ try {
+ pipeConnector.validate(new PipeParameterValidator(pipeConnectorParameters));
+ final PipeConnectorRuntimeConfiguration runtimeConfiguration =
+ new PipeConnectorRuntimeConfiguration();
+ pipeConnector.customize(pipeConnectorParameters, runtimeConfiguration);
+ // TODO: use runtimeConfiguration to configure PipeConnector
+ } catch (Exception e) {
+ throw new PipeManagementException(
+ "Failed to construct PipeConnector, because of " + e.getMessage(), e);
+ }
+
// TODO: make pendingQueue size configurable
+ // 2. construct PipeConnectorSubtaskLifeCycle to manage PipeConnectorSubtask's life cycle
final ListenableBlockingPendingQueue<Event> pendingQueue =
new ListenableBlockingPendingQueue<>(65535);
final PipeConnectorSubtask pipeConnectorSubtask =
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 73c127f3dab..31e935256ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -26,7 +26,9 @@ import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -47,17 +49,14 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
*/
private ListenableUnblockingPendingQueue<Event> collectorPendingQueue;
- private PipeCollector pipeCollector;
+ private final PipeCollector pipeCollector;
public PipeTaskCollectorStage(String dataRegionId, PipeParameters collectorParameters) {
this.collectorParameters = collectorParameters;
// set data region id to collector parameters, so that collector can get data region id inside
// collector
collectorParameters.getAttribute().put(PipeCollectorConstant.DATA_REGION_KEY, dataRegionId);
- }
- @Override
- public void createSubtask() throws PipeException {
if (collectorParameters
.getStringOrDefault(
PipeCollectorConstant.COLLECTOR_KEY,
@@ -70,6 +69,22 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
}
}
+ @Override
+ public void createSubtask() throws PipeException {
+ try {
+ // 1. validate collector parameters
+ pipeCollector.validate(new PipeParameterValidator(collectorParameters));
+
+ // 2. customize collector
+ final PipeCollectorRuntimeConfiguration runtimeConfiguration =
+ new PipeCollectorRuntimeConfiguration();
+ pipeCollector.customize(collectorParameters, runtimeConfiguration);
+ // TODO: use runtimeConfiguration to configure collector
+ } catch (Exception e) {
+ throw new PipeException(e.getMessage(), e);
+ }
+ }
+
@Override
public void startSubtask() throws PipeException {
try {
@@ -94,7 +109,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
}
public EventSupplier getEventSupplier() {
- return () -> pipeCollector.supply();
+ return pipeCollector::supply;
}
public ListenableUnblockingPendingQueue<Event> getCollectorPendingQueue() {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 77bc2de5f44..3ba031a23a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -29,7 +29,9 @@ import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.queue.ListenablePendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -40,7 +42,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
protected final PipeProcessorSubtaskExecutor executor =
PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor();
- protected final PipeProcessorSubtask subtask;
+ protected final PipeParameters pipeProcessorParameters;
+ protected final PipeProcessor pipeProcessor;
+ protected final PipeProcessorSubtask pipeProcessorSubtask;
protected final ListenablePendingQueue<Event> pipeCollectorInputPendingQueue;
protected final ListenablePendingQueue<Event> pipeConnectorOutputPendingQueue;
@@ -61,13 +65,14 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
@Nullable ListenablePendingQueue<Event> pipeCollectorInputPendingQueue,
PipeParameters pipeProcessorParameters,
ListenableBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
+ this.pipeProcessorParameters = pipeProcessorParameters;
+
final String taskId = pipeName + "_" + dataRegionId;
- final PipeProcessor pipeProcessor =
- PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
+ pipeProcessor = PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
final PipeEventCollector pipeConnectorOutputEventCollector =
new PipeEventCollector(pipeConnectorOutputPendingQueue);
- this.subtask =
+ this.pipeProcessorSubtask =
new PipeProcessorSubtask(
taskId,
pipeCollectorInputEventSupplier,
@@ -81,43 +86,58 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
taskId,
() -> {
if (status == PipeStatus.RUNNING) {
- executor.start(subtask.getTaskID());
+ executor.start(pipeProcessorSubtask.getTaskID());
}
})
- .registerNotEmptyToEmptyListener(taskId, () -> executor.stop(subtask.getTaskID()))
+ .registerNotEmptyToEmptyListener(
+ taskId, () -> executor.stop(pipeProcessorSubtask.getTaskID()))
: null;
this.pipeConnectorOutputPendingQueue =
pipeConnectorOutputPendingQueue
- .registerNotFullToFullListener(taskId, () -> executor.stop(subtask.getTaskID()))
+ .registerNotFullToFullListener(
+ taskId, () -> executor.stop(pipeProcessorSubtask.getTaskID()))
.registerFullToNotFullListener(
taskId,
() -> {
// only start when the pipe is running
if (status == PipeStatus.RUNNING) {
pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
- executor.start(subtask.getTaskID());
+ executor.start(pipeProcessorSubtask.getTaskID());
}
});
}
@Override
public void createSubtask() throws PipeException {
- executor.register(subtask);
+ try {
+ // 1. validate processor parameters
+ pipeProcessor.validate(new PipeParameterValidator(pipeProcessorParameters));
+
+ // 2. customize processor
+ final PipeProcessorRuntimeConfiguration runtimeConfiguration =
+ new PipeProcessorRuntimeConfiguration();
+ pipeProcessor.customize(pipeProcessorParameters, runtimeConfiguration);
+ // TODO: use runtimeConfiguration to configure processor
+ } catch (Exception e) {
+ throw new PipeException(e.getMessage(), e);
+ }
+
+ executor.register(pipeProcessorSubtask);
}
@Override
public void startSubtask() throws PipeException {
- executor.start(subtask.getTaskID());
+ executor.start(pipeProcessorSubtask.getTaskID());
}
@Override
public void stopSubtask() throws PipeException {
- executor.stop(subtask.getTaskID());
+ executor.stop(pipeProcessorSubtask.getTaskID());
}
@Override
public void dropSubtask() throws PipeException {
- final String taskId = subtask.getTaskID();
+ final String taskId = pipeProcessorSubtask.getTaskID();
if (pipeCollectorInputPendingQueue != null) {
pipeCollectorInputPendingQueue.removeEmptyToNotEmptyListener(taskId);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index c8931a521f8..6e0f82681a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -804,8 +804,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
@Override
public TSStatus pushPipeMeta(TPushPipeMetaReq req) {
- List<PipeMeta> pipeMetas = new ArrayList<>();
- req.getPipeMetas().forEach(byteBuffer -> pipeMetas.add(PipeMeta.deserialize(byteBuffer)));
+ final List<PipeMeta> pipeMetas = new ArrayList<>();
+ for (ByteBuffer byteBuffer : req.getPipeMetas()) {
+ pipeMetas.add(PipeMeta.deserialize(byteBuffer));
+ }
PipeAgent.task().handlePipeMetaChanges(pipeMetas);
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index 741e4ddb1b1..d833e2761a0 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -69,7 +69,7 @@ public class CachedSchemaPatternMatcherTest {
new PipeParameters(
new HashMap<String, String>() {
{
- put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root");
+ put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, "root");
put(PipeCollectorConstant.DATA_REGION_KEY, "1");
}
}),
@@ -85,7 +85,7 @@ public class CachedSchemaPatternMatcherTest {
new PipeParameters(
new HashMap<String, String>() {
{
- put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root." + finalI1);
+ put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, "root." + finalI1);
put(PipeCollectorConstant.DATA_REGION_KEY, "1");
}
}),
@@ -99,7 +99,9 @@ public class CachedSchemaPatternMatcherTest {
new PipeParameters(
new HashMap<String, String>() {
{
- put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root." + finalI + "." + finalJ);
+ put(
+ PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
+ "root." + finalI + "." + finalJ);
put(PipeCollectorConstant.DATA_REGION_KEY, "1");
}
}),
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index f4e7f7f6843..a2c3508b294 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -111,7 +111,7 @@ public class PipeRealtimeCollectTest {
new PipeParameters(
new HashMap<String, String>() {
{
- put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern1);
+ put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern1);
put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion1);
}
}),
@@ -120,7 +120,7 @@ public class PipeRealtimeCollectTest {
new PipeParameters(
new HashMap<String, String>() {
{
- put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern2);
+ put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern2);
put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion1);
}
}),
@@ -129,7 +129,7 @@ public class PipeRealtimeCollectTest {
new PipeParameters(
new HashMap<String, String>() {
{
- put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern1);
+ put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern1);
put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion2);
}
}),
@@ -138,7 +138,7 @@ public class PipeRealtimeCollectTest {
new PipeParameters(
new HashMap<String, String>() {
{
- put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern2);
+ put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern2);
put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion2);
}
}),