You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/04/18 08:47:29 UTC
[iotdb] branch master updated: [IOTDB-5727] pipe task management at config node (#9533)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 110ea55786 [IOTDB-5727] pipe task management at config node (#9533)
110ea55786 is described below
commit 110ea55786051c23d9bd2e422f1952f337c8ac68
Author: Caideyipi <87...@users.noreply.github.com>
AuthorDate: Tue Apr 18 16:47:17 2023 +0800
[IOTDB-5727] pipe task management at config node (#9533)
Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
.../confignode/client/DataNodeRequestType.java | 9 +-
.../client/async/AsyncDataNodeClientPool.java | 18 +-
.../client/async/handlers/AsyncClientHandler.java | 11 -
.../async/handlers/rpc/OperatePipeRPCHandler.java | 60 ----
.../consensus/request/ConfigPhysicalPlan.java | 56 ++--
.../consensus/request/ConfigPhysicalPlanType.java | 35 ++-
.../task/CreatePipePlanV2.java} | 41 +--
.../task/DropPipePlanV2.java} | 12 +-
.../task/SetPipeStatusPlanV2.java} | 34 +--
...PipeSinkPlan.java => CreatePipeSinkPlanV1.java} | 10 +-
.../{DropPipePlan.java => DropPipePlanV1.java} | 10 +-
...opPipeSinkPlan.java => DropPipeSinkPlanV1.java} | 10 +-
...GetPipeSinkPlan.java => GetPipeSinkPlanV1.java} | 10 +-
...reatePipePlan.java => PreCreatePipePlanV1.java} | 10 +-
.../request/write/sync/RecordPipeMessagePlan.java | 4 +-
...ipeStatusPlan.java => SetPipeStatusPlanV1.java} | 24 +-
.../{ShowPipePlan.java => ShowPipePlanV1.java} | 10 +-
.../iotdb/confignode/manager/ConfigManager.java | 103 ++-----
.../apache/iotdb/confignode/manager/IManager.java | 42 +--
.../iotdb/confignode/manager/ProcedureManager.java | 25 +-
.../confignode/manager/RetryFailedTasksThread.java | 64 +----
.../iotdb/confignode/manager/SyncManager.java | 306 ---------------------
.../iotdb/confignode/manager/load/LoadManager.java | 1 -
.../iotdb/confignode/manager/node/NodeManager.java | 6 +-
.../iotdb/confignode/manager/pipe/PipeManager.java | 19 +-
.../manager/pipe/PipePluginCoordinator.java | 8 +
.../manager/pipe/PipeTaskCoordinator.java | 69 +++++
.../persistence/executor/ConfigPlanExecutor.java | 48 ++--
.../confignode/persistence/pipe/PipeInfo.java | 59 +++-
.../persistence/pipe/PipePluginInfo.java | 45 ++-
.../confignode/persistence/pipe/PipeTaskInfo.java | 203 ++++++++++++++
.../persistence/pipe/PipeTaskOperation.java | 10 +-
.../persistence/sync/ClusterSyncInfo.java | 225 ---------------
.../procedure/env/ConfigNodeProcedureEnv.java | 25 ++
.../pipe/plugin/CreatePipePluginProcedure.java | 30 +-
.../impl/pipe/plugin/DropPipePluginProcedure.java | 24 +-
.../pipe/task/AbstractOperatePipeProcedureV2.java | 161 +++++++++++
.../impl/pipe/task/CreatePipeProcedureV2.java | 270 ++++++++++++++++++
.../impl/pipe/task/DropPipeProcedureV2.java | 157 +++++++++++
.../impl/pipe/task/StartPipeProcedureV2.java | 177 ++++++++++++
.../impl/pipe/task/StopPipeProcedureV2.java | 177 ++++++++++++
.../impl/sync/AbstractOperatePipeProcedure.java | 147 ----------
.../procedure/impl/sync/CreatePipeProcedure.java | 131 +--------
.../procedure/impl/sync/DropPipeProcedure.java | 97 +------
.../procedure/impl/sync/StartPipeProcedure.java | 136 +--------
.../procedure/impl/sync/StopPipeProcedure.java | 136 +--------
.../task/OperatePipeTaskState.java} | 13 +-
.../procedure/store/ProcedureFactory.java | 16 ++
.../confignode/procedure/store/ProcedureType.java | 10 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 16 +-
.../request/ConfigPhysicalPlanSerDeTest.java | 156 ++++++++---
.../persistence/ClusterSyncInfoTest.java | 164 -----------
.../iotdb/confignode/persistence/PipeInfoTest.java | 99 +++++++
.../CreatePipePluginProcedureTest.java | 3 +-
.../{ => plugin}/DropPipePluginProcedureTest.java | 3 +-
.../CreatePipeProcedureV2Test.java} | 33 ++-
.../DropPipeProcedureV2Test.java} | 12 +-
.../StartPipeProcedureV2Test.java} | 12 +-
.../StopPipeProcedureV2Test.java} | 12 +-
.../meta/ConfigNodePipePluginMetaKeeper.java | 26 +-
.../pipe/plugin/meta/PipePluginMetaKeeper.java | 22 ++
.../pipe/task/meta/PipeConsensusGroupTaskMeta.java | 98 +++++++
.../iotdb/commons/pipe/task/meta/PipeMeta.java | 95 +++++++
.../commons/pipe/task/meta/PipeMetaKeeper.java | 97 +++++++
.../commons/pipe/task/meta/PipeRuntimeMeta.java | 142 ++++++++++
.../commons/pipe/task/meta/PipeStaticMeta.java | 170 ++++++++++++
.../{PipeTaskMetaAccessor.java => PipeStatus.java} | 30 +-
.../iotdb/commons/sync/pipe/SyncOperation.java | 1 -
.../plan/statement/crud/InsertRowStatement.java | 2 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 41 +--
thrift/src/main/thrift/datanode.thrift | 8 +-
71 files changed, 2638 insertions(+), 1908 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 00a9423d44..25f18a59a9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -66,10 +66,13 @@ public enum DataNodeRequestType {
CREATE_PIPE_PLUGIN,
DROP_PIPE_PLUGIN,
- /** Sync */
- PRE_CREATE_PIPE,
+ /** Pipe Task */
+ CREATE_PIPE,
+ /**
+ * DROP_PIPE, START_PIPE, STOP_PIPE Merge them into OPERATE_PIPE since these requests only require
+ * pipe name
+ */
OPERATE_PIPE,
- ROLLBACK_OPERATE_PIPE,
/** CQ */
EXECUTE_CQ,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 5a9d51ac9b..c641cec211 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -219,6 +219,12 @@ public class AsyncDataNodeClientPool {
(AsyncTSStatusRPCHandler)
clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
break;
+ case CREATE_PIPE:
+ client.createPipeOnDataNode(
+ (TCreatePipeOnDataNodeReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
case MERGE:
case FULL_MERGE:
client.merge(
@@ -301,24 +307,12 @@ public class AsyncDataNodeClientPool {
(DeleteSchemaRPCHandler)
clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
break;
- case PRE_CREATE_PIPE:
- client.createPipeOnDataNode(
- (TCreatePipeOnDataNodeReq) clientHandler.getRequest(requestId),
- (AsyncTSStatusRPCHandler)
- clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
- break;
case OPERATE_PIPE:
client.operatePipeOnDataNode(
(TOperatePipeOnDataNodeReq) clientHandler.getRequest(requestId),
(AsyncTSStatusRPCHandler)
clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
break;
- case ROLLBACK_OPERATE_PIPE:
- client.operatePipeOnDataNodeForRollback(
- (TOperatePipeOnDataNodeReq) clientHandler.getRequest(requestId),
- (AsyncTSStatusRPCHandler)
- clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
- break;
case CONSTRUCT_SCHEMA_BLACK_LIST_WITH_TEMPLATE:
client.constructSchemaBlackListWithTemplate(
(TConstructSchemaBlackListWithTemplateReq) clientHandler.getRequest(requestId),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index a5b9422ac8..b807b52c5a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHan
import org.apache.iotdb.confignode.client.async.handlers.rpc.CountPathsUsingTemplateRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.DeleteSchemaRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackListRPCHandler;
-import org.apache.iotdb.confignode.client.async.handlers.rpc.OperatePipeRPCHandler;
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
@@ -176,16 +175,6 @@ public class AsyncClientHandler<Q, R> {
dataNodeLocationMap,
(Map<Integer, TFetchSchemaBlackListResp>) responseMap,
countDownLatch);
- case PRE_CREATE_PIPE:
- case OPERATE_PIPE:
- case ROLLBACK_OPERATE_PIPE:
- return new OperatePipeRPCHandler(
- requestType,
- requestId,
- targetDataNode,
- dataNodeLocationMap,
- (Map<Integer, TSStatus>) responseMap,
- countDownLatch);
case COUNT_PATHS_USING_TEMPLATE:
return new CountPathsUsingTemplateRPCHandler(
requestType,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/OperatePipeRPCHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/OperatePipeRPCHandler.java
deleted file mode 100644
index 7adf508562..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/OperatePipeRPCHandler.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.handlers.rpc;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class OperatePipeRPCHandler extends AsyncTSStatusRPCHandler {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(OperatePipeRPCHandler.class);
-
- public OperatePipeRPCHandler(
- DataNodeRequestType requestType,
- int requestId,
- TDataNodeLocation targetDataNode,
- Map<Integer, TDataNodeLocation> dataNodeLocationMap,
- Map<Integer, TSStatus> responseMap,
- CountDownLatch countDownLatch) {
- super(requestType, requestId, targetDataNode, dataNodeLocationMap, responseMap, countDownLatch);
- }
-
- @Override
- public void onError(Exception e) {
- String errorMsg = e.getMessage() + " on DataNode: " + formattedTargetLocation;
- LOGGER.error(errorMsg);
-
- responseMap.put(
- requestId,
- new TSStatus(
- RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
-
- // Always CountDown
- countDownLatch.countDown();
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index aef3c33276..87e2a32258 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -77,6 +77,9 @@ import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchem
import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
@@ -85,14 +88,14 @@ import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGr
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlanV1;
import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlanV1;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan;
@@ -339,30 +342,39 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
case UpdateRegionLocation:
plan = new UpdateRegionLocationPlan();
break;
- case CreatePipeSink:
- plan = new CreatePipeSinkPlan();
+ case CreatePipeSinkV1:
+ plan = new CreatePipeSinkPlanV1();
break;
- case DropPipeSink:
- plan = new DropPipeSinkPlan();
+ case DropPipeSinkV1:
+ plan = new DropPipeSinkPlanV1();
break;
- case GetPipeSink:
- plan = new GetPipeSinkPlan();
+ case GetPipeSinkV1:
+ plan = new GetPipeSinkPlanV1();
break;
- case PreCreatePipe:
- plan = new PreCreatePipePlan();
+ case PreCreatePipeV1:
+ plan = new PreCreatePipePlanV1();
break;
- case SetPipeStatus:
- plan = new SetPipeStatusPlan();
+ case SetPipeStatusV1:
+ plan = new SetPipeStatusPlanV1();
break;
- case DropPipe:
- plan = new DropPipePlan();
+ case DropPipeV1:
+ plan = new DropPipePlanV1();
break;
- case ShowPipe:
- plan = new ShowPipePlan();
+ case ShowPipeV1:
+ plan = new ShowPipePlanV1();
break;
- case RecordPipeMessage:
+ case RecordPipeMessageV1:
plan = new RecordPipeMessagePlan();
break;
+ case CreatePipeV2:
+ plan = new CreatePipePlanV2();
+ break;
+ case SetPipeStatusV2:
+ plan = new SetPipeStatusPlanV2();
+ break;
+ case DropPipeV2:
+ plan = new DropPipePlanV2();
+ break;
case GetRegionId:
plan = new GetRegionIdPlan();
break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index b391c3c086..32efc881a7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -114,15 +114,23 @@ public enum ConfigPhysicalPlanType {
UnsetTemplate((short) 810),
DropSchemaTemplate((short) 811),
- /** Sync */
- CreatePipeSink((short) 900),
- DropPipeSink((short) 901),
- GetPipeSink((short) 902),
- PreCreatePipe((short) 903),
- SetPipeStatus((short) 904),
- DropPipe((short) 905),
- ShowPipe((short) 906),
- RecordPipeMessage((short) 907),
+ /** Deprecated types for sync, restored them for upgrade */
+ @Deprecated
+ CreatePipeSinkV1((short) 900),
+ @Deprecated
+ DropPipeSinkV1((short) 901),
+ @Deprecated
+ GetPipeSinkV1((short) 902),
+ @Deprecated
+ PreCreatePipeV1((short) 903),
+ @Deprecated
+ SetPipeStatusV1((short) 904),
+ @Deprecated
+ DropPipeV1((short) 905),
+ @Deprecated
+ ShowPipeV1((short) 906),
+ @Deprecated
+ RecordPipeMessageV1((short) 907),
/** Trigger */
AddTriggerInTable((short) 1000),
@@ -158,7 +166,14 @@ public enum ConfigPhysicalPlanType {
/** Quota */
setSpaceQuota((short) 1400),
- setThrottleQuota((short) 1401);
+ setThrottleQuota((short) 1401),
+
+ /** Pipe Task */
+ CreatePipeV2((short) 1500),
+ /** START PIPE & STOP PIPE */
+ SetPipeStatusV2((short) 1501),
+ DropPipeV2((short) 1502),
+ ;
private final short planType;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/CreatePipePlanV2.java
similarity index 56%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/CreatePipePlanV2.java
index d29aa92a97..c37f77cdb1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/CreatePipePlanV2.java
@@ -16,10 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.consensus.request.write.sync;
-import org.apache.iotdb.commons.sync.pipe.PipeMessage;
-import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
+package org.apache.iotdb.confignode.consensus.request.write.pipe.task;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
@@ -27,39 +28,39 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class RecordPipeMessagePlan extends ConfigPhysicalPlan {
+public class CreatePipePlanV2 extends ConfigPhysicalPlan {
- private String pipeName;
- private PipeMessage pipeMessage;
+ private PipeStaticMeta pipeStaticMeta;
+ private PipeRuntimeMeta pipeRuntimeMeta;
- public RecordPipeMessagePlan() {
- super(ConfigPhysicalPlanType.RecordPipeMessage);
+ public CreatePipePlanV2() {
+ super(ConfigPhysicalPlanType.CreatePipeV2);
}
- public RecordPipeMessagePlan(String pipeName, PipeMessage pipeMessage) {
- this();
- this.pipeName = pipeName;
- this.pipeMessage = pipeMessage;
+ public CreatePipePlanV2(PipeStaticMeta pipeStaticMeta, PipeRuntimeMeta pipeRuntimeMeta) {
+ super(ConfigPhysicalPlanType.CreatePipeV2);
+ this.pipeStaticMeta = pipeStaticMeta;
+ this.pipeRuntimeMeta = pipeRuntimeMeta;
}
- public String getPipeName() {
- return pipeName;
+ public PipeStaticMeta getPipeStaticMeta() {
+ return pipeStaticMeta;
}
- public PipeMessage getPipeMessage() {
- return pipeMessage;
+ public PipeRuntimeMeta getPipeRuntimeMeta() {
+ return pipeRuntimeMeta;
}
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
- BasicStructureSerDeUtil.write(pipeName, stream);
- pipeMessage.serialize(stream);
+ pipeStaticMeta.serialize(stream);
+ pipeRuntimeMeta.serialize(stream);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
- pipeName = BasicStructureSerDeUtil.readString(buffer);
- pipeMessage = PipeMessage.deserialize(buffer);
+ pipeStaticMeta = PipeStaticMeta.deserialize(buffer);
+ pipeRuntimeMeta = PipeRuntimeMeta.deserialize(buffer);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/DropPipePlanV2.java
similarity index 84%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlan.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/DropPipePlanV2.java
index 96f0dd9f53..15b0ea8440 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/DropPipePlanV2.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.consensus.request.write.sync;
+package org.apache.iotdb.confignode.consensus.request.write.pipe.task;
import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
@@ -26,16 +26,16 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class DropPipePlan extends ConfigPhysicalPlan {
+public class DropPipePlanV2 extends ConfigPhysicalPlan {
private String pipeName;
- public DropPipePlan() {
- super(ConfigPhysicalPlanType.DropPipe);
+ public DropPipePlanV2() {
+ super(ConfigPhysicalPlanType.DropPipeV2);
}
- public DropPipePlan(String pipeName) {
- this();
+ public DropPipePlanV2(String pipeName) {
+ super(ConfigPhysicalPlanType.DropPipeV2);
this.pipeName = pipeName;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/SetPipeStatusPlanV2.java
similarity index 68%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/SetPipeStatusPlanV2.java
index 999ba3aea5..d785204688 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/SetPipeStatusPlanV2.java
@@ -16,9 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.consensus.request.write.sync;
-import org.apache.iotdb.commons.sync.pipe.PipeStatus;
+package org.apache.iotdb.confignode.consensus.request.write.pipe.task;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -27,46 +28,39 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class SetPipeStatusPlan extends ConfigPhysicalPlan {
+public class SetPipeStatusPlanV2 extends ConfigPhysicalPlan {
+
private String pipeName;
- private PipeStatus pipeStatus;
+ private PipeStatus status;
- public SetPipeStatusPlan() {
- super(ConfigPhysicalPlanType.SetPipeStatus);
+ public SetPipeStatusPlanV2() {
+ super(ConfigPhysicalPlanType.SetPipeStatusV2);
}
- public SetPipeStatusPlan(String pipeName, PipeStatus pipeStatus) {
- this();
+ public SetPipeStatusPlanV2(String pipeName, PipeStatus status) {
+ super(ConfigPhysicalPlanType.SetPipeStatusV2);
this.pipeName = pipeName;
- this.pipeStatus = pipeStatus;
+ this.status = status;
}
public String getPipeName() {
return pipeName;
}
- public void setPipeName(String pipeName) {
- this.pipeName = pipeName;
- }
-
public PipeStatus getPipeStatus() {
- return pipeStatus;
- }
-
- public void setPipeStatus(PipeStatus pipeStatus) {
- this.pipeStatus = pipeStatus;
+ return status;
}
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
ReadWriteIOUtils.write(pipeName, stream);
- ReadWriteIOUtils.write((byte) pipeStatus.ordinal(), stream);
+ ReadWriteIOUtils.write(status.getType(), stream);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
pipeName = ReadWriteIOUtils.readString(buffer);
- pipeStatus = PipeStatus.values()[ReadWriteIOUtils.readByte(buffer)];
+ status = PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(buffer));
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/CreatePipeSinkPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/CreatePipeSinkPlanV1.java
similarity index 87%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/CreatePipeSinkPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/CreatePipeSinkPlanV1.java
index 1729f96765..d357ccf8a3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/CreatePipeSinkPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/CreatePipeSinkPlanV1.java
@@ -27,15 +27,17 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class CreatePipeSinkPlan extends ConfigPhysicalPlan {
+// Deprecated, restored for upgrade
+@Deprecated
+public class CreatePipeSinkPlanV1 extends ConfigPhysicalPlan {
private TPipeSinkInfo pipeSinkInfo;
- public CreatePipeSinkPlan() {
- super(ConfigPhysicalPlanType.CreatePipeSink);
+ public CreatePipeSinkPlanV1() {
+ super(ConfigPhysicalPlanType.CreatePipeSinkV1);
}
- public CreatePipeSinkPlan(TPipeSinkInfo pipeSinkInfo) {
+ public CreatePipeSinkPlanV1(TPipeSinkInfo pipeSinkInfo) {
this();
this.pipeSinkInfo = pipeSinkInfo;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlanV1.java
similarity index 88%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlanV1.java
index 96f0dd9f53..7dbcde6d9f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlanV1.java
@@ -26,15 +26,17 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class DropPipePlan extends ConfigPhysicalPlan {
+// Deprecated, restored for upgrade
+@Deprecated
+public class DropPipePlanV1 extends ConfigPhysicalPlan {
private String pipeName;
- public DropPipePlan() {
- super(ConfigPhysicalPlanType.DropPipe);
+ public DropPipePlanV1() {
+ super(ConfigPhysicalPlanType.DropPipeV1);
}
- public DropPipePlan(String pipeName) {
+ public DropPipePlanV1(String pipeName) {
this();
this.pipeName = pipeName;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipeSinkPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipeSinkPlanV1.java
similarity index 87%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipeSinkPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipeSinkPlanV1.java
index 9ac2c09f6c..f50a8e1cd1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipeSinkPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipeSinkPlanV1.java
@@ -26,15 +26,17 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class DropPipeSinkPlan extends ConfigPhysicalPlan {
+// Deprecated, restored for upgrade
+@Deprecated
+public class DropPipeSinkPlanV1 extends ConfigPhysicalPlan {
private String pipeSinkName;
- public DropPipeSinkPlan() {
- super(ConfigPhysicalPlanType.DropPipeSink);
+ public DropPipeSinkPlanV1() {
+ super(ConfigPhysicalPlanType.DropPipeSinkV1);
}
- public DropPipeSinkPlan(String pipeSinkName) {
+ public DropPipeSinkPlanV1(String pipeSinkName) {
this();
this.pipeSinkName = pipeSinkName;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlanV1.java
similarity index 87%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlanV1.java
index 7014725438..26c4d2860b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlanV1.java
@@ -26,15 +26,17 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class GetPipeSinkPlan extends ConfigPhysicalPlan {
+// Deprecated, restored for upgrade
+@Deprecated
+public class GetPipeSinkPlanV1 extends ConfigPhysicalPlan {
/** empty pipeSinkName means get all PIPESINK */
private String pipeSinkName;
- public GetPipeSinkPlan() {
- super(ConfigPhysicalPlanType.GetPipeSink);
+ public GetPipeSinkPlanV1() {
+ super(ConfigPhysicalPlanType.GetPipeSinkV1);
}
- public GetPipeSinkPlan(String pipeSinkName) {
+ public GetPipeSinkPlanV1(String pipeSinkName) {
this();
this.pipeSinkName = pipeSinkName;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlanV1.java
similarity index 86%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlanV1.java
index 34665947e9..24c3125fc1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlanV1.java
@@ -26,15 +26,17 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class PreCreatePipePlan extends ConfigPhysicalPlan {
+// Deprecated, restored for upgrade
+@Deprecated
+public class PreCreatePipePlanV1 extends ConfigPhysicalPlan {
private PipeInfo pipeInfo;
- public PreCreatePipePlan() {
- super(ConfigPhysicalPlanType.PreCreatePipe);
+ public PreCreatePipePlanV1() {
+ super(ConfigPhysicalPlanType.PreCreatePipeV1);
}
- public PreCreatePipePlan(PipeInfo pipeInfo) {
+ public PreCreatePipePlanV1(PipeInfo pipeInfo) {
this();
this.pipeInfo = pipeInfo;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java
index d29aa92a97..3fe6b8c8fa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java
@@ -27,13 +27,15 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+// Deprecated, restored for upgrade
+@Deprecated
public class RecordPipeMessagePlan extends ConfigPhysicalPlan {
private String pipeName;
private PipeMessage pipeMessage;
public RecordPipeMessagePlan() {
- super(ConfigPhysicalPlanType.RecordPipeMessage);
+ super(ConfigPhysicalPlanType.RecordPipeMessageV1);
}
public RecordPipeMessagePlan(String pipeName, PipeMessage pipeMessage) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlanV1.java
similarity index 81%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlanV1.java
index 999ba3aea5..4aa0f9668e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlanV1.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.consensus.request.write.sync;
import org.apache.iotdb.commons.sync.pipe.PipeStatus;
@@ -27,36 +28,31 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class SetPipeStatusPlan extends ConfigPhysicalPlan {
+// Deprecated, restored for upgrade
+@Deprecated
+public class SetPipeStatusPlanV1 extends ConfigPhysicalPlan {
private String pipeName;
+
private PipeStatus pipeStatus;
- public SetPipeStatusPlan() {
- super(ConfigPhysicalPlanType.SetPipeStatus);
+ public SetPipeStatusPlanV1() {
+ super(ConfigPhysicalPlanType.SetPipeStatusV1);
}
- public SetPipeStatusPlan(String pipeName, PipeStatus pipeStatus) {
- this();
+ public SetPipeStatusPlanV1(String pipeName, PipeStatus status) {
+ super(ConfigPhysicalPlanType.SetPipeStatusV1);
this.pipeName = pipeName;
- this.pipeStatus = pipeStatus;
+ this.pipeStatus = status;
}
public String getPipeName() {
return pipeName;
}
- public void setPipeName(String pipeName) {
- this.pipeName = pipeName;
- }
-
public PipeStatus getPipeStatus() {
return pipeStatus;
}
- public void setPipeStatus(PipeStatus pipeStatus) {
- this.pipeStatus = pipeStatus;
- }
-
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlanV1.java
similarity index 88%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlanV1.java
index cc01a7d53a..0acf6c277d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlanV1.java
@@ -26,15 +26,17 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class ShowPipePlan extends ConfigPhysicalPlan {
+// Deprecated, restored for upgrade
+@Deprecated
+public class ShowPipePlanV1 extends ConfigPhysicalPlan {
/** empty pipeName means show all PIPE */
private String pipeName;
- public ShowPipePlan() {
- super(ConfigPhysicalPlanType.ShowPipe);
+ public ShowPipePlanV1() {
+ super(ConfigPhysicalPlanType.ShowPipeV1);
}
- public ShowPipePlan(String pipeName) {
+ public ShowPipePlanV1(String pipeName) {
this();
this.pipeName = pipeName;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 96ea55f134..69cf4a447f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -39,7 +39,6 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.service.metric.MetricService;
-import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.utils.AuthUtils;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
@@ -66,8 +65,6 @@ import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.response.auth.PermissionInfoResp;
import org.apache.iotdb.confignode.consensus.response.database.CountDatabaseResp;
@@ -104,7 +101,6 @@ import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
-import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
@@ -134,8 +130,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
@@ -230,8 +224,6 @@ public class ConfigManager implements IManager {
/** Manage Trigger. */
private final TriggerManager triggerManager;
- /** Sync. */
- private final SyncManager syncManager;
/** CQ. */
private final CQManager cqManager;
@@ -260,7 +252,6 @@ public class ConfigManager implements IManager {
ProcedureInfo procedureInfo = new ProcedureInfo();
UDFInfo udfInfo = new UDFInfo();
TriggerInfo triggerInfo = new TriggerInfo();
- ClusterSyncInfo syncInfo = new ClusterSyncInfo();
CQInfo cqInfo = new CQInfo();
ModelInfo modelInfo = new ModelInfo();
PipeInfo pipeInfo = new PipeInfo();
@@ -276,7 +267,6 @@ public class ConfigManager implements IManager {
procedureInfo,
udfInfo,
triggerInfo,
- syncInfo,
cqInfo,
modelInfo,
pipeInfo,
@@ -291,7 +281,6 @@ public class ConfigManager implements IManager {
this.procedureManager = new ProcedureManager(this, procedureInfo);
this.udfManager = new UDFManager(this, udfInfo);
this.triggerManager = new TriggerManager(this, triggerInfo);
- this.syncManager = new SyncManager(this, syncInfo);
this.cqManager = new CQManager(this);
this.loadManager = new LoadManager(this);
this.modelManager = new ModelManager(this, modelInfo);
@@ -931,10 +920,6 @@ public class ConfigManager implements IManager {
}
@Override
- public SyncManager getSyncManager() {
- return syncManager;
- }
-
public ModelManager getModelManager() {
return modelManager;
}
@@ -1569,83 +1554,36 @@ public class ConfigManager implements IManager {
}
}
- @Override
- public TSStatus createPipeSink(CreatePipeSinkPlan plan) {
- TSStatus status = confirmLeader();
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return syncManager.createPipeSink(plan);
- } else {
- return status;
- }
- }
-
- @Override
- public TSStatus dropPipeSink(DropPipeSinkPlan plan) {
- TSStatus status = confirmLeader();
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return syncManager.dropPipeSink(plan);
- } else {
- return status;
- }
- }
-
- @Override
- public TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req) {
- TSStatus status = confirmLeader();
- TGetPipeSinkResp resp = new TGetPipeSinkResp();
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return syncManager.getPipeSink(req.getPipeSinkName());
- } else {
- return resp.setStatus(status);
- }
- }
-
@Override
public TSStatus createPipe(TCreatePipeReq req) {
TSStatus status = confirmLeader();
- LOGGER.info("createPipe: {}", req);
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- // TODO: Implement PipeManager
- return status;
- } else {
- return status;
- }
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? pipeManager.getPipeTaskCoordinator().createPipe(req)
+ : status;
}
@Override
public TSStatus startPipe(String pipeName) {
TSStatus status = confirmLeader();
- LOGGER.info("startPipe: {}", pipeName);
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- // TODO: Implement PipeManager
- return status;
- } else {
- return status;
- }
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? pipeManager.getPipeTaskCoordinator().startPipe(pipeName)
+ : status;
}
@Override
public TSStatus stopPipe(String pipeName) {
TSStatus status = confirmLeader();
- LOGGER.info("stopPipe: {}", pipeName);
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- // TODO: Implement PipeManager
- return status;
- } else {
- return status;
- }
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? pipeManager.getPipeTaskCoordinator().stopPipe(pipeName)
+ : status;
}
@Override
public TSStatus dropPipe(String pipeName) {
TSStatus status = confirmLeader();
- LOGGER.info("dropPipe: {}", pipeName);
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- // TODO: Implement PipeManager
- return status;
- } else {
- return status;
- }
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? pipeManager.getPipeTaskCoordinator().dropPipe(pipeName)
+ : status;
}
@Override
@@ -1664,22 +1602,17 @@ public class ConfigManager implements IManager {
@Override
public TGetAllPipeInfoResp getAllPipeInfo() {
TSStatus status = confirmLeader();
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return syncManager.getAllPipeInfo();
- } else {
- return new TGetAllPipeInfoResp().setStatus(status);
- }
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? pipeManager.getPipeTaskCoordinator().showPipes()
+ : new TGetAllPipeInfoResp().setStatus(status);
}
@Override
public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
TSStatus status = confirmLeader();
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return syncManager.recordPipeMessage(
- req.getPipeName(), PipeMessage.deserialize(ByteBuffer.wrap(req.getMessage())));
- } else {
- return status;
- }
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? pipeManager.getPipeTaskCoordinator().recordPipeMessage(req)
+ : status;
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 3d6490db57..9726b93b2d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -41,8 +41,6 @@ import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaRep
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.cq.CQManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
@@ -76,8 +74,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
@@ -168,13 +164,6 @@ public interface IManager {
*/
TriggerManager getTriggerManager();
- /**
- * Get SyncManager
- *
- * @return SyncManager instance
- */
- SyncManager getSyncManager();
-
/**
* Get ProcedureManager
*
@@ -189,6 +178,13 @@ public interface IManager {
*/
CQManager getCQManager();
+ /**
+ * Get ModelManager
+ *
+ * @return ModelManager instance
+ */
+ ModelManager getModelManager();
+
/**
* Get PipeManager
*
@@ -549,30 +545,6 @@ public interface IManager {
*/
TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req);
- /**
- * Create PipeSink
- *
- * @param plan Info about PipeSink
- * @return TSStatus
- */
- TSStatus createPipeSink(CreatePipeSinkPlan plan);
-
- /**
- * Drop PipeSink
- *
- * @param plan Name of PipeSink
- * @return TSStatus
- */
- TSStatus dropPipeSink(DropPipeSinkPlan plan);
-
- /**
- * Get PipeSink by name. If pipeSinkName is empty, get all PipeSinks.
- *
- * @param req specify the pipeSinkName
- * @return TGetPipeSinkResp contains the PipeSink
- */
- TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req);
-
/**
* Create Pipe
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 63a78d36c1..6b03eea066 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.model.ModelInformation;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
@@ -54,16 +53,16 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure
import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.schema.DeactivateTemplateProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteDatabaseProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure;
import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
-import org.apache.iotdb.confignode.procedure.impl.sync.CreatePipeProcedure;
-import org.apache.iotdb.confignode.procedure.impl.sync.DropPipeProcedure;
-import org.apache.iotdb.confignode.procedure.impl.sync.StartPipeProcedure;
-import org.apache.iotdb.confignode.procedure.impl.sync.StopPipeProcedure;
import org.apache.iotdb.confignode.procedure.impl.trigger.CreateTriggerProcedure;
import org.apache.iotdb.confignode.procedure.impl.trigger.DropTriggerProcedure;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
@@ -624,7 +623,7 @@ public class ProcedureManager {
public TSStatus createPipe(TCreatePipeReq req) {
try {
- long procedureId = executor.submitProcedure(new CreatePipeProcedure(req));
+ long procedureId = executor.submitProcedure(new CreatePipeProcedureV2(req));
List<TSStatus> statusList = new ArrayList<>();
boolean isSucceed =
waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
@@ -634,14 +633,14 @@ public class ProcedureManager {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(statusList.get(0).getMessage());
}
- } catch (PipeException e) {
+ } catch (Exception e) {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
}
}
public TSStatus startPipe(String pipeName) {
try {
- long procedureId = executor.submitProcedure(new StartPipeProcedure(pipeName));
+ long procedureId = executor.submitProcedure(new StartPipeProcedureV2(pipeName));
List<TSStatus> statusList = new ArrayList<>();
boolean isSucceed =
waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
@@ -651,14 +650,14 @@ public class ProcedureManager {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(statusList.get(0).getMessage());
}
- } catch (PipeException e) {
+ } catch (Exception e) {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
}
}
public TSStatus stopPipe(String pipeName) {
try {
- long procedureId = executor.submitProcedure(new StopPipeProcedure(pipeName));
+ long procedureId = executor.submitProcedure(new StopPipeProcedureV2(pipeName));
List<TSStatus> statusList = new ArrayList<>();
boolean isSucceed =
waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
@@ -668,14 +667,14 @@ public class ProcedureManager {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(statusList.get(0).getMessage());
}
- } catch (PipeException e) {
+ } catch (Exception e) {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
}
}
public TSStatus dropPipe(String pipeName) {
try {
- long procedureId = executor.submitProcedure(new DropPipeProcedure(pipeName));
+ long procedureId = executor.submitProcedure(new DropPipeProcedureV2(pipeName));
List<TSStatus> statusList = new ArrayList<>();
boolean isSucceed =
waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
@@ -685,7 +684,7 @@ public class ProcedureManager {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(statusList.get(0).getMessage());
}
- } catch (PipeException e) {
+ } catch (Exception e) {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
index 97666a005b..c9ffdd67b8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
@@ -23,35 +23,26 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
-import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
- * The RetryFailedTasksThread executed periodically to retry failed tasks in Trigger, Sync, Template
- * and CQ
+ * The RetryFailedTasksThread executed periodically to retry failed tasks in Trigger, Template and
+ * CQ
*/
public class RetryFailedTasksThread {
@@ -69,10 +60,6 @@ public class RetryFailedTasksThread {
/** Trigger */
private final Set<TDataNodeLocation> oldUnknownNodes;
- /** Sync */
- private final Map<Integer, Queue<TOperatePipeOnDataNodeReq>> messageMap =
- new ConcurrentHashMap<>();
-
public RetryFailedTasksThread(IManager configManager) {
this.configManager = configManager;
this.nodeManager = configManager.getNodeManager();
@@ -108,9 +95,6 @@ public class RetryFailedTasksThread {
private void retryFailedTasks() {
// trigger
triggerDetectTask();
-
- // sync
- syncDetectTask();
}
/**
@@ -148,48 +132,4 @@ public class RetryFailedTasksThread {
}
}
}
-
- public void retryRollbackReq(List<Integer> dataNodeIds, TOperatePipeOnDataNodeReq req) {
- for (int id : dataNodeIds) {
- messageMap.computeIfAbsent(id, i -> new LinkedList<>()).add(req);
- }
- }
-
- /**
- * The syncDetectTask executed periodically to roll back the failed requests in operating pipe.
- */
- private void syncDetectTask() {
- for (Map.Entry<Integer, Queue<TOperatePipeOnDataNodeReq>> entry : messageMap.entrySet()) {
- int dataNodeId = entry.getKey();
- if (NodeStatus.Running.equals(nodeManager.getNodeStatusByNodeId(dataNodeId))) {
- final Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
- dataNodeLocationMap.put(
- dataNodeId, nodeManager.getRegisteredDataNodeLocations().get(dataNodeId));
- TOperatePipeOnDataNodeReq request;
- while ((request = entry.getValue().peek()) != null) {
- AsyncClientHandler<TOperatePipeOnDataNodeReq, TSStatus> clientHandler =
- new AsyncClientHandler<>(
- DataNodeRequestType.ROLLBACK_OPERATE_PIPE, request, dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
- TSStatus tsStatus = clientHandler.getResponseList().get(0);
- if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- entry.getValue().poll();
- } else if (tsStatus.getCode() == TSStatusCode.PIPE_ERROR.getStatusCode()) {
- // skip
- LOGGER.warn(
- String.format(
- "Roll back failed because %s. Skip this roll back request [%s].",
- tsStatus.getMessage(), request));
- } else {
- // connection failure, keep and retry.
- LOGGER.error(
- String.format(
- "Roll back failed because %s. This roll back request [%s] will be retried later.",
- tsStatus.getMessage(), request));
- break;
- }
- }
- }
- }
- }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
deleted file mode 100644
index dc2c7bdd95..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.manager;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.commons.exception.sync.PipeException;
-import org.apache.iotdb.commons.exception.sync.PipeSinkException;
-import org.apache.iotdb.commons.exception.sync.PipeSinkNotExistException;
-import org.apache.iotdb.commons.sync.pipe.PipeInfo;
-import org.apache.iotdb.commons.sync.pipe.PipeMessage;
-import org.apache.iotdb.commons.sync.pipe.PipeStatus;
-import org.apache.iotdb.commons.sync.pipe.SyncOperation;
-import org.apache.iotdb.commons.sync.pipesink.PipeSink;
-import org.apache.iotdb.commons.utils.StatusUtils;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
-import org.apache.iotdb.confignode.consensus.response.pipe.PipeResp;
-import org.apache.iotdb.confignode.consensus.response.pipe.PipeSinkResp;
-import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
-import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
-import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
-import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import com.google.common.eventbus.AllowConcurrentEvents;
-import com.google.common.eventbus.Subscribe;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
-public class SyncManager {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SyncManager.class);
-
- private final IManager configManager;
- private final ClusterSyncInfo clusterSyncInfo;
-
- public SyncManager(IManager configManager, ClusterSyncInfo clusterSyncInfo) {
- this.configManager = configManager;
- this.clusterSyncInfo = clusterSyncInfo;
- }
-
- public void lockSyncMetadata() {
- clusterSyncInfo.lockSyncMetadata();
- }
-
- public void unlockSyncMetadata() {
- clusterSyncInfo.unlockSyncMetadata();
- }
-
- // ======================================================
- // region Implement of PipeSink
- // ======================================================
-
- public TSStatus createPipeSink(CreatePipeSinkPlan plan) {
- try {
- clusterSyncInfo.checkAddPipeSink(plan);
- return getConsensusManager().write(plan).getStatus();
- } catch (PipeSinkException e) {
- LOGGER.error(e.getMessage());
- return new TSStatus(TSStatusCode.CREATE_PIPE_SINK_ERROR.getStatusCode())
- .setMessage(e.getMessage());
- }
- }
-
- public TSStatus dropPipeSink(DropPipeSinkPlan plan) {
- try {
- clusterSyncInfo.checkDropPipeSink(plan.getPipeSinkName());
- return getConsensusManager().write(plan).getStatus();
- } catch (PipeSinkException e) {
- LOGGER.error(e.getMessage());
- return new TSStatus(TSStatusCode.CREATE_PIPE_SINK_ERROR.getStatusCode())
- .setMessage(e.getMessage());
- }
- }
-
- public TGetPipeSinkResp getPipeSink(String pipeSinkName) {
- GetPipeSinkPlan getPipeSinkPlan = new GetPipeSinkPlan(pipeSinkName);
- PipeSinkResp pipeSinkResp =
- (PipeSinkResp) getConsensusManager().read(getPipeSinkPlan).getDataset();
- TGetPipeSinkResp resp = new TGetPipeSinkResp();
- resp.setStatus(pipeSinkResp.getStatus());
- if (pipeSinkResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- resp.setPipeSinkInfoList(
- pipeSinkResp.getPipeSinkList().stream()
- .map(PipeSink::getTPipeSinkInfo)
- .collect(Collectors.toList()));
- }
- return resp;
- }
-
- // endregion
-
- // ======================================================
- // region Implement of Pipe
- // ======================================================
-
- public void checkAddPipe(PipeInfo pipeInfo) throws PipeException, PipeSinkNotExistException {
- clusterSyncInfo.checkAddPipe(pipeInfo);
- }
-
- public TSStatus preCreatePipe(PipeInfo pipeInfo) {
- pipeInfo.setStatus(PipeStatus.PARTIAL_CREATE);
- return getConsensusManager().write(new PreCreatePipePlan(pipeInfo)).getStatus();
- }
-
- public TSStatus setPipeStatus(String pipeName, PipeStatus pipeStatus) {
- return getConsensusManager().write(new SetPipeStatusPlan(pipeName, pipeStatus)).getStatus();
- }
-
- public TSStatus dropPipe(String pipeName) {
- return getConsensusManager().write(new DropPipePlan(pipeName)).getStatus();
- }
-
- public TSStatus recordPipeMessage(String pipeName, PipeMessage pipeMessage) {
- return getConsensusManager()
- .write(new RecordPipeMessagePlan(pipeName, pipeMessage))
- .getStatus();
- }
-
- public TShowPipeResp showPipe(String pipeName) {
- ShowPipePlan showPipePlan = new ShowPipePlan(pipeName);
- PipeResp pipeResp = (PipeResp) getConsensusManager().read(showPipePlan).getDataset();
- TShowPipeResp resp = new TShowPipeResp();
- resp.setStatus(pipeResp.getStatus());
- if (pipeResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- resp.setPipeInfoList(
- pipeResp.getPipeInfoList().stream()
- .map(PipeInfo::getTShowPipeInfo)
- .collect(Collectors.toList()));
- }
- return resp;
- }
-
- public PipeInfo getPipeInfo(String pipeName) throws PipeException {
- return clusterSyncInfo.getPipeInfo(pipeName);
- }
-
- public TGetAllPipeInfoResp getAllPipeInfo() {
- try {
- // Should lock SyncMetadata to block operation PIPE procedure
- lockSyncMetadata();
- TGetAllPipeInfoResp resp = new TGetAllPipeInfoResp();
- resp.setStatus(StatusUtils.OK);
- resp.setAllPipeInfo(
- clusterSyncInfo.getAllPipeInfos().stream()
- .map(PipeInfo::serializeToByteBuffer)
- .collect(Collectors.toList()));
- return resp;
- } finally {
- unlockSyncMetadata();
- }
- }
-
- /**
- * Broadcast DataNodes to operate PIPE operation.
- *
- * @param pipeName name of PIPE
- * @param operation only support {@link SyncOperation#START_PIPE}, {@link SyncOperation#STOP_PIPE}
- * and {@link SyncOperation#DROP_PIPE}
- * @return Map key is DataNodeId and value is TSStatus
- */
- public Map<Integer, TSStatus> operatePipeOnDataNodes(String pipeName, SyncOperation operation) {
- NodeManager nodeManager = configManager.getNodeManager();
- final Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
- nodeManager
- .filterDataNodeThroughStatus(NodeStatus.Running)
- .forEach(
- dataNodeConfiguration ->
- dataNodeLocationMap.put(
- dataNodeConfiguration.getLocation().getDataNodeId(),
- dataNodeConfiguration.getLocation()));
- final TOperatePipeOnDataNodeReq request =
- new TOperatePipeOnDataNodeReq(pipeName, (byte) operation.ordinal());
-
- AsyncClientHandler<TOperatePipeOnDataNodeReq, TSStatus> clientHandler =
- new AsyncClientHandler<>(DataNodeRequestType.OPERATE_PIPE, request, dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
-
- return clientHandler.getResponseMap();
- }
-
- /**
- * Broadcast DataNodes to operate PIPE operation for roll back procedure.
- *
- * @param pipeName name of PIPE
- * @param createTime specifies the version of the pipe that needs to be rolled back to avoid
- * concurrent errors caused by retry requests
- * @param operation only support {@link SyncOperation#START_PIPE}, {@link SyncOperation#STOP_PIPE}
- * and {@link SyncOperation#DROP_PIPE}
- * @param dataNodeIds target DataNodeId set
- */
- public void operatePipeOnDataNodesForRollback(
- String pipeName, long createTime, SyncOperation operation, Set<Integer> dataNodeIds) {
- NodeManager nodeManager = configManager.getNodeManager();
- final Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
- nodeManager.filterDataNodeThroughStatus(NodeStatus.Running).stream()
- .filter(
- dataNodeConfiguration ->
- dataNodeIds.contains(dataNodeConfiguration.getLocation().dataNodeId))
- .forEach(
- dataNodeConfiguration ->
- dataNodeLocationMap.put(
- dataNodeConfiguration.getLocation().getDataNodeId(),
- dataNodeConfiguration.getLocation()));
- final TOperatePipeOnDataNodeReq request =
- new TOperatePipeOnDataNodeReq(pipeName, (byte) operation.ordinal())
- .setCreateTime(createTime);
-
- AsyncClientHandler<TOperatePipeOnDataNodeReq, TSStatus> clientHandler =
- new AsyncClientHandler<>(
- DataNodeRequestType.ROLLBACK_OPERATE_PIPE, request, dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
-
- List<Integer> failedRollbackDataNodeId = new ArrayList<>();
- for (Map.Entry<Integer, TSStatus> responseEntry : clientHandler.getResponseMap().entrySet()) {
- if (responseEntry.getValue().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- failedRollbackDataNodeId.add(responseEntry.getKey());
- }
- }
- configManager.getRetryFailedTasksThread().retryRollbackReq(failedRollbackDataNodeId, request);
- }
-
- /**
- * Broadcast DataNodes to pre create PIPE
- *
- * @param pipeInfo pipeInfo
- * @return Map key is DataNodeId and value is TSStatus
- */
- public Map<Integer, TSStatus> preCreatePipeOnDataNodes(PipeInfo pipeInfo) {
- NodeManager nodeManager = configManager.getNodeManager();
- final Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
- nodeManager
- .filterDataNodeThroughStatus(NodeStatus.Running)
- .forEach(
- dataNodeConfiguration ->
- dataNodeLocationMap.put(
- dataNodeConfiguration.getLocation().getDataNodeId(),
- dataNodeConfiguration.getLocation()));
- final TCreatePipeOnDataNodeReq request =
- new TCreatePipeOnDataNodeReq(pipeInfo.serializeToByteBuffer());
-
- AsyncClientHandler<TCreatePipeOnDataNodeReq, TSStatus> clientHandler =
- new AsyncClientHandler<>(DataNodeRequestType.PRE_CREATE_PIPE, request, dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
-
- return clientHandler.getResponseMap();
- }
-
- /**
- * When some Nodes' states changed during a heartbeat loop, the eventbus in LoadManager will post
- * the different NodeStatstics event to SyncManager and ClusterSchemaManager.
- *
- * @param nodeStatisticsEvent nodeStatistics that changed in a heartbeat loop
- */
- @Subscribe
- @AllowConcurrentEvents
- public void handleNodeStatistics(NodeStatisticsEvent nodeStatisticsEvent) {
- // TODO
- }
-
- // endregion
-
- private ConsensusManager getConsensusManager() {
- return configManager.getConsensusManager();
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index a19c942d25..932f4039cc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -105,7 +105,6 @@ public class LoadManager {
this.routeBalancer = new RouteBalancer(configManager);
eventBus.register(configManager.getClusterSchemaManager());
- eventBus.register(configManager.getSyncManager());
}
/**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 97125db05a..bcf116737b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -233,7 +233,8 @@ public class NodeManager {
}
private TRuntimeConfiguration getRuntimeConfiguration() {
- getPipeManager().getPipePluginCoordinator().getPipePluginInfo().acquirePipePluginInfoLock();
+ getPipeManager().getPipePluginCoordinator().lock();
+ getPipeManager().getPipeTaskCoordinator().lock();
getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
getUDFManager().getUdfInfo().acquireUDFTableLock();
@@ -252,7 +253,8 @@ public class NodeManager {
} finally {
getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
getUDFManager().getUdfInfo().releaseUDFTableLock();
- getPipeManager().getPipePluginCoordinator().getPipePluginInfo().releasePipePluginInfoLock();
+ getPipeManager().getPipeTaskCoordinator().unlock();
+ getPipeManager().getPipePluginCoordinator().unlock();
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
index dbe95bab31..648d9eaff2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
@@ -22,21 +22,30 @@ package org.apache.iotdb.confignode.manager.pipe;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class PipeManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(PipeManager.class);
-
private final PipePluginCoordinator pipePluginCoordinator;
+ private final PipeTaskCoordinator pipeTaskCoordinator;
+
+ private final PipeInfo pipeInfo;
+
public PipeManager(ConfigManager configManager, PipeInfo pipeInfo) {
this.pipePluginCoordinator =
new PipePluginCoordinator(configManager, pipeInfo.getPipePluginInfo());
+ this.pipeTaskCoordinator = new PipeTaskCoordinator(configManager, pipeInfo.getPipeTaskInfo());
+ this.pipeInfo = pipeInfo;
}
public PipePluginCoordinator getPipePluginCoordinator() {
return pipePluginCoordinator;
}
+
+ public PipeTaskCoordinator getPipeTaskCoordinator() {
+ return pipeTaskCoordinator;
+ }
+
+ public PipeInfo getPipeInfo() {
+ return pipeInfo;
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
index 9e45a41dce..91da899463 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
@@ -55,6 +55,14 @@ public class PipePluginCoordinator {
return pipePluginInfo;
}
+ public void lock() {
+ pipePluginInfo.acquirePipePluginInfoLock();
+ }
+
+ public void unlock() {
+ pipePluginInfo.releasePipePluginInfoLock();
+ }
+
public TSStatus createPipePlugin(TCreatePipePluginReq req) {
final String pluginName = req.getPluginName().toUpperCase();
final String className = req.getClassName();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
new file mode 100644
index 0000000000..908bd643f3
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.pipe;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
+
+public class PipeTaskCoordinator {
+
+ private final ConfigManager configManager;
+ private final PipeTaskInfo pipeTaskInfo;
+
+ public PipeTaskCoordinator(ConfigManager configManager, PipeTaskInfo pipeTaskInfo) {
+ this.configManager = configManager;
+ this.pipeTaskInfo = pipeTaskInfo;
+ }
+
+ public void lock() {
+ pipeTaskInfo.acquirePipeTaskInfoLock();
+ }
+
+ public void unlock() {
+ pipeTaskInfo.releasePipeTaskInfoLock();
+ }
+
+ public TSStatus createPipe(TCreatePipeReq req) {
+ return configManager.getProcedureManager().createPipe(req);
+ }
+
+ public TSStatus startPipe(String pipeName) {
+ return configManager.getProcedureManager().startPipe(pipeName);
+ }
+
+ public TSStatus stopPipe(String pipeName) {
+ return configManager.getProcedureManager().stopPipe(pipeName);
+ }
+
+ public TSStatus dropPipe(String pipeName) {
+ return configManager.getProcedureManager().dropPipe(pipeName);
+ }
+
+ public TGetAllPipeInfoResp showPipes() {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 954d7489a9..1721eef36e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -76,6 +76,9 @@ import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchem
import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
@@ -83,14 +86,6 @@ import org.apache.iotdb.confignode.consensus.request.write.quota.SetThrottleQuot
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan;
@@ -115,7 +110,6 @@ import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
-import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.consensus.common.DataSet;
@@ -157,7 +151,6 @@ public class ConfigPlanExecutor {
private final UDFInfo udfInfo;
private final TriggerInfo triggerInfo;
- private final ClusterSyncInfo syncInfo;
private final CQInfo cqInfo;
@@ -175,7 +168,6 @@ public class ConfigPlanExecutor {
ProcedureInfo procedureInfo,
UDFInfo udfInfo,
TriggerInfo triggerInfo,
- ClusterSyncInfo syncInfo,
CQInfo cqInfo,
ModelInfo modelInfo,
PipeInfo pipeInfo,
@@ -201,9 +193,6 @@ public class ConfigPlanExecutor {
this.udfInfo = udfInfo;
this.snapshotProcessorList.add(udfInfo);
- this.syncInfo = syncInfo;
- this.snapshotProcessorList.add(syncInfo);
-
this.cqInfo = cqInfo;
this.snapshotProcessorList.add(cqInfo);
@@ -258,10 +247,6 @@ public class ConfigPlanExecutor {
return clusterSchemaInfo.getAllTemplateSetInfo();
case GetTemplateSetInfo:
return clusterSchemaInfo.getTemplateSetInfo((GetTemplateSetInfoPlan) req);
- case GetPipeSink:
- return syncInfo.getPipeSink((GetPipeSinkPlan) req);
- case ShowPipe:
- return syncInfo.showPipe((ShowPipePlan) req);
case GetTriggerTable:
return triggerInfo.getTriggerTable((GetTriggerTablePlan) req);
case GetTriggerLocation:
@@ -394,18 +379,12 @@ public class ConfigPlanExecutor {
return clusterSchemaInfo.unsetSchemaTemplate((UnsetSchemaTemplatePlan) physicalPlan);
case DropSchemaTemplate:
return clusterSchemaInfo.dropSchemaTemplate((DropSchemaTemplatePlan) physicalPlan);
- case CreatePipeSink:
- return syncInfo.addPipeSink((CreatePipeSinkPlan) physicalPlan);
- case DropPipeSink:
- return syncInfo.dropPipeSink((DropPipeSinkPlan) physicalPlan);
- case PreCreatePipe:
- return syncInfo.preCreatePipe((PreCreatePipePlan) physicalPlan);
- case SetPipeStatus:
- return syncInfo.setPipeStatus((SetPipeStatusPlan) physicalPlan);
- case DropPipe:
- return syncInfo.dropPipe((DropPipePlan) physicalPlan);
- case RecordPipeMessage:
- return syncInfo.recordPipeMessage((RecordPipeMessagePlan) physicalPlan);
+ case CreatePipeV2:
+ return pipeInfo.getPipeTaskInfo().createPipe((CreatePipePlanV2) physicalPlan);
+ case SetPipeStatusV2:
+ return pipeInfo.getPipeTaskInfo().setPipeStatus((SetPipeStatusPlanV2) physicalPlan);
+ case DropPipeV2:
+ return pipeInfo.getPipeTaskInfo().dropPipe((DropPipePlanV2) physicalPlan);
case ADD_CQ:
return cqInfo.addCQ((AddCQPlan) physicalPlan);
case DROP_CQ:
@@ -426,6 +405,15 @@ public class ConfigPlanExecutor {
return pipeInfo.getPipePluginInfo().createPipePlugin((CreatePipePluginPlan) physicalPlan);
case DropPipePlugin:
return pipeInfo.getPipePluginInfo().dropPipePlugin((DropPipePluginPlan) physicalPlan);
+ case CreatePipeSinkV1:
+ case DropPipeV1:
+ case DropPipeSinkV1:
+ case GetPipeSinkV1:
+ case PreCreatePipeV1:
+ case RecordPipeMessageV1:
+ case SetPipeStatusV1:
+ case ShowPipeV1:
+ return new TSStatus(TSStatusCode.INCOMPATIBLE_VERSION.getStatusCode());
case setSpaceQuota:
return quotaInfo.setSpaceQuota((SetSpaceQuotaPlan) physicalPlan);
case setThrottleQuota:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
index c881b0a646..733dc4c776 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
@@ -23,26 +23,81 @@ import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import java.io.File;
import java.io.IOException;
+import java.util.Objects;
public class PipeInfo implements SnapshotProcessor {
private final PipePluginInfo pipePluginInfo;
+ private final PipeTaskInfo pipeTaskInfo;
public PipeInfo() throws IOException {
pipePluginInfo = new PipePluginInfo();
+ pipeTaskInfo = new PipeTaskInfo();
}
public PipePluginInfo getPipePluginInfo() {
return pipePluginInfo;
}
+ public PipeTaskInfo getPipeTaskInfo() {
+ return pipeTaskInfo;
+ }
+
+ ///////////////////////////////// SnapshotProcessor /////////////////////////////////
+
@Override
public boolean processTakeSnapshot(File snapshotDir) throws IOException {
- return pipePluginInfo.processTakeSnapshot(snapshotDir);
+ pipeTaskInfo.acquirePipeTaskInfoLock();
+ pipePluginInfo.acquirePipePluginInfoLock();
+ try {
+ return pipeTaskInfo.processTakeSnapshot(snapshotDir)
+ && pipePluginInfo.processTakeSnapshot(snapshotDir);
+ } finally {
+ pipePluginInfo.releasePipePluginInfoLock();
+ pipeTaskInfo.releasePipeTaskInfoLock();
+ }
}
@Override
public void processLoadSnapshot(File snapshotDir) throws IOException {
- pipePluginInfo.processLoadSnapshot(snapshotDir);
+ pipeTaskInfo.acquirePipeTaskInfoLock();
+ pipePluginInfo.acquirePipePluginInfoLock();
+ try {
+ pipeTaskInfo.processLoadSnapshot(snapshotDir);
+ pipePluginInfo.processLoadSnapshot(snapshotDir);
+ } finally {
+ pipePluginInfo.releasePipePluginInfoLock();
+ pipeTaskInfo.releasePipeTaskInfoLock();
+ }
+ }
+
+ ///////////////////////////////// equals & hashCode /////////////////////////////////
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PipeInfo pipeInfo = (PipeInfo) o;
+ return Objects.equals(pipePluginInfo, pipeInfo.pipePluginInfo)
+ && Objects.equals(pipeTaskInfo, pipeInfo.pipeTaskInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pipePluginInfo, pipeTaskInfo);
+ }
+
+ @Override
+ public String toString() {
+ return "PipeInfo{"
+ + "pipePluginInfo="
+ + pipePluginInfo
+ + ", pipeTaskInfo="
+ + pipeTaskInfo
+ + '}';
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 4a56f79328..0b47dccb6c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePip
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
import org.apache.iotdb.confignode.consensus.response.pipe.plugin.PipePluginTableResp;
import org.apache.iotdb.confignode.consensus.response.udf.JarResp;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.pipe.api.exception.PipeManagementException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -48,6 +49,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
public class PipePluginInfo implements SnapshotProcessor {
@@ -110,6 +112,11 @@ public class PipePluginInfo implements SnapshotProcessor {
return !pipePluginMetaKeeper.containsJar(jarName);
}
+ public boolean checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) {
+ // TODO: validate the plugins in the create pipe Req.
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
/////////////////////////////// Pipe Plugin Management ///////////////////////////////
public TSStatus createPipePlugin(CreatePipePluginPlan physicalPlan) {
@@ -185,11 +192,8 @@ public class PipePluginInfo implements SnapshotProcessor {
return false;
}
- acquirePipePluginInfoLock();
try (FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile)) {
pipePluginMetaKeeper.processTakeSnapshot(fileOutputStream);
- } finally {
- releasePipePluginInfoLock();
}
return true;
}
@@ -204,11 +208,40 @@ public class PipePluginInfo implements SnapshotProcessor {
return;
}
- acquirePipePluginInfoLock();
try (FileInputStream fileInputStream = new FileInputStream(snapshotFile)) {
pipePluginMetaKeeper.processLoadSnapshot(fileInputStream);
- } finally {
- releasePipePluginInfoLock();
}
}
+
+ /////////////////////////////// hashCode & equals ///////////////////////////////
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pipePluginMetaKeeper, pipePluginExecutableManager);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ PipePluginInfo other = (PipePluginInfo) obj;
+ return Objects.equals(pipePluginExecutableManager, other.pipePluginExecutableManager)
+ && Objects.equals(pipePluginMetaKeeper, other.pipePluginMetaKeeper);
+ }
+
+ @Override
+ public String toString() {
+ return "PipePluginInfo [pipePluginMetaKeeper="
+ + pipePluginMetaKeeper
+ + ", pipePluginExecutableManager="
+ + pipePluginExecutableManager
+ + "]";
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
new file mode 100644
index 0000000000..9ea25320ff
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.persistence.pipe;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class PipeTaskInfo implements SnapshotProcessor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskInfo.class);
+ private static final String SNAPSHOT_FILE_NAME = "pipe_task_info.bin";
+
+ private final ReentrantLock pipeTaskInfoLock = new ReentrantLock();
+
+ private final PipeMetaKeeper pipeMetaKeeper;
+
+ public PipeTaskInfo() {
+ this.pipeMetaKeeper = new PipeMetaKeeper();
+ }
+
+ /////////////////////////////// Lock ///////////////////////////////
+
+ public void acquirePipeTaskInfoLock() {
+ pipeTaskInfoLock.lock();
+ }
+
+ public void releasePipeTaskInfoLock() {
+ pipeTaskInfoLock.unlock();
+ }
+
+ /////////////////////////////// Validator ///////////////////////////////
+
+ public boolean checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) {
+ if (!isPipeExisted(createPipeRequest.getPipeName())) {
+ return true;
+ }
+
+ LOGGER.info(
+ String.format(
+ "Failed to create pipe [%s], the pipe with the same name has been created",
+ createPipeRequest.getPipeName()));
+ return false;
+ }
+
+ public boolean checkBeforeStartPipe(String pipeName) {
+ if (!isPipeExisted(pipeName)) {
+ LOGGER.info(String.format("Failed to start pipe [%s], the pipe does not exist", pipeName));
+ return false;
+ }
+
+ if (getPipeStatus(pipeName) == PipeStatus.RUNNING) {
+ LOGGER.info(
+ String.format("Failed to start pipe [%s], the pipe is already running", pipeName));
+ return false;
+ }
+
+ return true;
+ }
+
+ public boolean checkBeforeStopPipe(String pipeName) {
+ if (!isPipeExisted(pipeName)) {
+ LOGGER.info(String.format("Failed to stop pipe [%s], the pipe does not exist", pipeName));
+ return false;
+ }
+
+ if (getPipeStatus(pipeName) == PipeStatus.STOPPED) {
+ LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already stop", pipeName));
+ return false;
+ }
+
+ return true;
+ }
+
+ public boolean checkBeforeDropPipe(String pipeName) {
+ if (isPipeExisted(pipeName)) {
+ return true;
+ }
+
+ LOGGER.info(String.format("Failed to drop pipe [%s], the pipe does not exist", pipeName));
+ return false;
+ }
+
+ private boolean isPipeExisted(String pipeName) {
+ return pipeMetaKeeper.containsPipeMeta(pipeName);
+ }
+
+ private PipeStatus getPipeStatus(String pipeName) {
+ return pipeMetaKeeper.getPipeMeta(pipeName).getRuntimeMeta().getStatus().get();
+ }
+
+ /////////////////////////////// Pipe Task Management ///////////////////////////////
+
+ public TSStatus createPipe(CreatePipePlanV2 plan) {
+ pipeMetaKeeper.addPipeMeta(
+ plan.getPipeStaticMeta().getPipeName(),
+ new PipeMeta(plan.getPipeStaticMeta(), plan.getPipeRuntimeMeta()));
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ public TSStatus setPipeStatus(SetPipeStatusPlanV2 plan) {
+ pipeMetaKeeper
+ .getPipeMeta(plan.getPipeName())
+ .getRuntimeMeta()
+ .getStatus()
+ .set(plan.getPipeStatus());
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ public TSStatus dropPipe(DropPipePlanV2 plan) {
+ pipeMetaKeeper.removePipeMeta(plan.getPipeName());
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ /////////////////////////////// Snapshot ///////////////////////////////
+
+ @Override
+ public boolean processTakeSnapshot(File snapshotDir) throws IOException {
+ File snapshotFile = new File(snapshotDir, SNAPSHOT_FILE_NAME);
+ if (snapshotFile.exists() && snapshotFile.isFile()) {
+ LOGGER.error(
+ "Failed to take snapshot, because snapshot file [{}] is already exist.",
+ snapshotFile.getAbsolutePath());
+ return false;
+ }
+
+ try (FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile)) {
+ pipeMetaKeeper.processTakeSnapshot(fileOutputStream);
+ }
+ return true;
+ }
+
+ @Override
+ public void processLoadSnapshot(File snapshotDir) throws IOException {
+ File snapshotFile = new File(snapshotDir, SNAPSHOT_FILE_NAME);
+ if (!snapshotFile.exists() || !snapshotFile.isFile()) {
+ LOGGER.error(
+ "Failed to load snapshot,snapshot file [{}] is not exist.",
+ snapshotFile.getAbsolutePath());
+ return;
+ }
+
+ try (FileInputStream fileInputStream = new FileInputStream(snapshotFile)) {
+ pipeMetaKeeper.processLoadSnapshot(fileInputStream);
+ }
+ }
+
+ /////////////////////////////// hashCode & equals ///////////////////////////////
+
+ @Override
+ public int hashCode() {
+ return pipeMetaKeeper.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeTaskInfo other = (PipeTaskInfo) obj;
+ return pipeMetaKeeper.equals(other.pipeMetaKeeper);
+ }
+
+ @Override
+ public String toString() {
+ return pipeMetaKeeper.toString();
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
similarity index 84%
copy from node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
index 04653122c2..ee42bc2462 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
@@ -17,6 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.commons.pipe.task.meta;
+package org.apache.iotdb.confignode.persistence.pipe;
-public class PipeTaskMetaAccessor {}
+public enum PipeTaskOperation {
+ CREATE_PIPE,
+ START_PIPE,
+ STOP_PIPE,
+ DROP_PIPE,
+ ;
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
deleted file mode 100644
index 43ecde07c1..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.persistence.sync;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.sync.PipeException;
-import org.apache.iotdb.commons.exception.sync.PipeNotExistException;
-import org.apache.iotdb.commons.exception.sync.PipeSinkException;
-import org.apache.iotdb.commons.exception.sync.PipeSinkNotExistException;
-import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
-import org.apache.iotdb.commons.sync.metadata.SyncMetadata;
-import org.apache.iotdb.commons.sync.pipe.PipeInfo;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
-import org.apache.iotdb.confignode.consensus.response.pipe.PipeResp;
-import org.apache.iotdb.confignode.consensus.response.pipe.PipeSinkResp;
-import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.locks.ReentrantLock;
-
-public class ClusterSyncInfo implements SnapshotProcessor {
-
- protected static final Logger LOGGER = LoggerFactory.getLogger(ClusterSyncInfo.class);
-
- private final SyncMetadata syncMetadata;
-
- private final ReentrantLock syncMetadataLock = new ReentrantLock();
-
- public ClusterSyncInfo() {
- syncMetadata = new SyncMetadata();
- }
- // ======================================================
- // region Implement of PipeSink
- // ======================================================
-
- /**
- * Check PipeSink before create operation
- *
- * @param createPipeSinkPlan createPipeSinkPlan
- * @throws PipeSinkException if there is PipeSink with the same name exists or attributes is
- * unsupported
- */
- public void checkAddPipeSink(CreatePipeSinkPlan createPipeSinkPlan) throws PipeSinkException {
- // check no exist
- syncMetadata.checkPipeSinkNoExist(createPipeSinkPlan.getPipeSinkInfo().getPipeSinkName());
- // check attributes
- SyncPipeUtil.parseTPipeSinkInfoAsPipeSink(createPipeSinkPlan.getPipeSinkInfo());
- }
-
- public TSStatus addPipeSink(CreatePipeSinkPlan plan) {
- TSStatus status = new TSStatus();
- try {
- syncMetadata.addPipeSink(SyncPipeUtil.parseTPipeSinkInfoAsPipeSink(plan.getPipeSinkInfo()));
- status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } catch (PipeSinkException e) {
- LOGGER.error("failed to execute CreatePipeSinkPlan {} on ClusterSyncInfo", plan, e);
- status.setCode(TSStatusCode.CREATE_PIPE_SINK_ERROR.getStatusCode());
- LOGGER.error(e.getMessage());
- }
- return status;
- }
-
- /**
- * Check PipeSink before drop operation
- *
- * @param pipeSinkName name
- * @throws PipeSinkException if PipeSink is being used or does not exist
- */
- public void checkDropPipeSink(String pipeSinkName) throws PipeSinkException {
- syncMetadata.checkDropPipeSink(pipeSinkName);
- }
-
- public TSStatus dropPipeSink(DropPipeSinkPlan plan) {
- syncMetadata.dropPipeSink(plan.getPipeSinkName());
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- }
-
- public PipeSinkResp getPipeSink(GetPipeSinkPlan plan) {
- PipeSinkResp resp = new PipeSinkResp();
- if (StringUtils.isEmpty(plan.getPipeSinkName())) {
- resp.setPipeSinkList(syncMetadata.getAllPipeSink());
- } else {
- resp.setPipeSinkList(
- Collections.singletonList(syncMetadata.getPipeSink(plan.getPipeSinkName())));
- }
- resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
- return resp;
- }
-
- // endregion
-
- // ======================================================
- // region Implement of Pipe
- // ======================================================
-
- /**
- * Check Pipe before create operation
- *
- * @param pipeInfo pipe info
- * @throws PipeException if there is Pipe with the same name exists or PipeSink does not exist
- */
- public void checkAddPipe(PipeInfo pipeInfo) throws PipeException, PipeSinkNotExistException {
- syncMetadata.checkAddPipe(pipeInfo);
- }
-
- public TSStatus preCreatePipe(PreCreatePipePlan physicalPlan) {
- syncMetadata.addPipe(physicalPlan.getPipeInfo());
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- }
-
- public TSStatus setPipeStatus(SetPipeStatusPlan physicalPlan) {
- syncMetadata.setPipeStatus(physicalPlan.getPipeName(), physicalPlan.getPipeStatus());
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- }
-
- public TSStatus dropPipe(DropPipePlan physicalPlan) {
- syncMetadata.dropPipe(physicalPlan.getPipeName());
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- }
-
- public TSStatus recordPipeMessage(RecordPipeMessagePlan physicalPlan) {
- syncMetadata.changePipeMessage(
- physicalPlan.getPipeName(), physicalPlan.getPipeMessage().getType());
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- }
-
- public PipeResp showPipe(ShowPipePlan plan) {
- PipeResp resp = new PipeResp();
- if (StringUtils.isEmpty(plan.getPipeName())) {
- // show all
- resp.setPipeInfoList(syncMetadata.getAllPipeInfos());
- } else {
- // show specific pipe
- resp.setPipeInfoList(Collections.singletonList(syncMetadata.getPipeInfo(plan.getPipeName())));
- }
- resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
- return resp;
- }
-
- /**
- * Get PipeInfo by pipeName. Check before start, stop and drop operation
- *
- * @param pipeName pipe name
- * @throws PipeNotExistException if there is Pipe does not exist
- */
- public PipeInfo getPipeInfo(String pipeName) throws PipeNotExistException {
- PipeInfo pipeInfo = syncMetadata.getPipeInfo(pipeName);
- if (pipeInfo == null) {
- throw new PipeNotExistException(pipeName);
- }
- return pipeInfo;
- }
-
- public List<PipeInfo> getAllPipeInfos() {
- return syncMetadata.getAllPipeInfos();
- }
-
- // endregion
-
- // ======================================================
- // region Implement of Lock and Unlock
- // ======================================================
-
- public void lockSyncMetadata() {
- LOGGER.info("Lock SyncMetadata");
- syncMetadataLock.lock();
- LOGGER.info("Acquire SyncMetadata lock");
- }
-
- public void unlockSyncMetadata() {
- LOGGER.info("Unlock SyncMetadata");
- syncMetadataLock.unlock();
- }
-
- // endregion
-
- // ======================================================
- // region Implement of Snapshot
- // ======================================================
-
- @Override
- public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
- return syncMetadata.processTakeSnapshot(snapshotDir);
- }
-
- @Override
- public void processLoadSnapshot(File snapshotDir) throws TException, IOException {
- syncMetadata.processLoadSnapshot(snapshotDir);
- }
-
- // endregion
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index efa53d330d..d722d37f0d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
@@ -61,6 +62,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
@@ -68,6 +70,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -656,6 +659,28 @@ public class ConfigNodeProcedureEnv {
return clientHandler.getResponseList();
}
+ public List<TSStatus> createPipeOnDataNodes(PipeMeta pipeMeta) throws IOException {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+
+ TCreatePipeOnDataNodeReq request =
+ new TCreatePipeOnDataNodeReq().setPipeMeta(pipeMeta.serialize());
+ final AsyncClientHandler<TCreatePipeOnDataNodeReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.CREATE_PIPE, request, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseList();
+ }
+
+ public List<TSStatus> operatePipeOnDataNodes(TOperatePipeOnDataNodeReq request) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+
+ final AsyncClientHandler<TOperatePipeOnDataNodeReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.OPERATE_PIPE, request, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseList();
+ }
+
public LockQueue getNodeLock() {
return nodeLock;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index 7d2a494fc7..b479066127 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.persistence.pipe.PipePluginInfo;
+import org.apache.iotdb.confignode.manager.pipe.PipePluginCoordinator;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
@@ -116,14 +116,18 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipeP
private Flow executeFromLock(ConfigNodeProcedureEnv env) {
LOGGER.info("CreatePipePluginProcedure: executeFromLock({})", pipePluginMeta.getPluginName());
- final PipePluginInfo pipePluginInfo =
- env.getConfigManager().getPipeManager().getPipePluginCoordinator().getPipePluginInfo();
+ final PipePluginCoordinator pipePluginCoordinator =
+ env.getConfigManager().getPipeManager().getPipePluginCoordinator();
- pipePluginInfo.acquirePipePluginInfoLock();
+ pipePluginCoordinator.lock();
try {
- pipePluginInfo.validateBeforeCreatingPipePlugin(
- pipePluginMeta.getPluginName(), pipePluginMeta.getJarName(), pipePluginMeta.getJarMD5());
+ pipePluginCoordinator
+ .getPipePluginInfo()
+ .validateBeforeCreatingPipePlugin(
+ pipePluginMeta.getPluginName(),
+ pipePluginMeta.getJarName(),
+ pipePluginMeta.getJarMD5());
} catch (PipeManagementException e) {
// The pipe plugin has already created, we should end the procedure
LOGGER.warn(
@@ -131,7 +135,7 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipeP
pipePluginMeta.getPluginName(),
pipePluginMeta.getPluginName());
setFailure(new ProcedureException(e.getMessage()));
- pipePluginInfo.releasePipePluginInfoLock();
+ pipePluginCoordinator.unlock();
return Flow.NO_MORE_STATE;
}
@@ -186,11 +190,7 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipeP
private Flow executeFromUnlock(ConfigNodeProcedureEnv env) {
LOGGER.info("CreatePipePluginProcedure: executeFromUnlock({})", pipePluginMeta.getPluginName());
- env.getConfigManager()
- .getPipeManager()
- .getPipePluginCoordinator()
- .getPipePluginInfo()
- .releasePipePluginInfoLock();
+ env.getConfigManager().getPipeManager().getPipePluginCoordinator().unlock();
return Flow.NO_MORE_STATE;
}
@@ -214,11 +214,7 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipeP
private void rollbackFromLock(ConfigNodeProcedureEnv env) {
LOGGER.info("CreatePipePluginProcedure: rollbackFromLock({})", pipePluginMeta.getPluginName());
- env.getConfigManager()
- .getPipeManager()
- .getPipePluginCoordinator()
- .getPipePluginInfo()
- .releasePipePluginInfoLock();
+ env.getConfigManager().getPipeManager().getPipePluginCoordinator().unlock();
}
private void rollbackFromCreateOnConfigNodes(ConfigNodeProcedureEnv env) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 815782a0ac..6aa4d6109d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
-import org.apache.iotdb.confignode.persistence.pipe.PipePluginInfo;
+import org.apache.iotdb.confignode.manager.pipe.PipePluginCoordinator;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
@@ -102,18 +102,18 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi
private Flow executeFromLock(ConfigNodeProcedureEnv env) {
LOGGER.info("DropPipePluginProcedure: executeFromLock({})", pluginName);
- final PipePluginInfo pipePluginInfo =
- env.getConfigManager().getPipeManager().getPipePluginCoordinator().getPipePluginInfo();
+ final PipePluginCoordinator pipePluginCoordinator =
+ env.getConfigManager().getPipeManager().getPipePluginCoordinator();
- pipePluginInfo.acquirePipePluginInfoLock();
+ pipePluginCoordinator.lock();
try {
- pipePluginInfo.validateBeforeDroppingPipePlugin(pluginName);
+ pipePluginCoordinator.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName);
} catch (PipeManagementException e) {
// if the pipe plugin is not exist, we should end the procedure
LOGGER.warn(e.getMessage());
setFailure(new ProcedureException(e.getMessage()));
- pipePluginInfo.releasePipePluginInfoLock();
+ pipePluginCoordinator.unlock();
return Flow.NO_MORE_STATE;
}
@@ -149,11 +149,7 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi
private Flow executeFromUnlock(ConfigNodeProcedureEnv env) {
LOGGER.info("DropPipePluginProcedure: executeFromUnlock({})", pluginName);
- env.getConfigManager()
- .getPipeManager()
- .getPipePluginCoordinator()
- .getPipePluginInfo()
- .releasePipePluginInfoLock();
+ env.getConfigManager().getPipeManager().getPipePluginCoordinator().unlock();
return Flow.NO_MORE_STATE;
}
@@ -177,11 +173,7 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi
private void rollbackFromLock(ConfigNodeProcedureEnv env) {
LOGGER.info("DropPipePluginProcedure: rollbackFromLock({})", pluginName);
- env.getConfigManager()
- .getPipeManager()
- .getPipePluginCoordinator()
- .getPipePluginInfo()
- .releasePipePluginInfoLock();
+ env.getConfigManager().getPipeManager().getPipePluginCoordinator().unlock();
}
private void rollbackFromDropOnDataNodes(ConfigNodeProcedureEnv env) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
new file mode 100644
index 0000000000..b840d94591
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+
+import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.exception.sync.PipeSinkException;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
+import org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * This procedure manage 4 kinds of PIPE operations: CREATE, START, STOP and DROP.
+ *
+ * <p>This class extends AbstractNodeProcedure to make sure that pipe task procedures can be
+ * executed in sequence and node procedures can be locked when a pipe task procedure is running.
+ */
+abstract class AbstractOperatePipeProcedureV2 extends AbstractNodeProcedure<OperatePipeTaskState> {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(AbstractOperatePipeProcedureV2.class);
+
+ private static final int RETRY_THRESHOLD = 3;
+
+ abstract PipeTaskOperation getOperation();
+
+ /**
+ * Execute at state VALIDATE_TASK
+ *
+ * @return true if procedure can finish directly
+ */
+ abstract boolean executeFromValidateTask(ConfigNodeProcedureEnv env)
+ throws PipeException, PipeSinkException;
+
+ /** Execute at state CALCULATE_INFO_FOR_TASK */
+ abstract void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeException;
+
+ /** Execute at state WRITE_CONFIG_NODE_CONSENSUS */
+ abstract void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
+ throws PipeException;
+
+ /** Execute at state OPERATE_ON_DATA_NODES */
+ abstract void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
+ throws PipeException, IOException;
+
+ @Override
+ protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeTaskState state)
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ try {
+ switch (state) {
+ case VALIDATE_TASK:
+ env.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
+ if (!executeFromValidateTask(env)) {
+ env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+ return Flow.NO_MORE_STATE;
+ }
+ setNextState(OperatePipeTaskState.CALCULATE_INFO_FOR_TASK);
+ break;
+ case CALCULATE_INFO_FOR_TASK:
+ executeFromCalculateInfoForTask(env);
+ setNextState(OperatePipeTaskState.WRITE_CONFIG_NODE_CONSENSUS);
+ break;
+ case WRITE_CONFIG_NODE_CONSENSUS:
+ executeFromWriteConfigNodeConsensus(env);
+ setNextState(OperatePipeTaskState.OPERATE_ON_DATA_NODES);
+ break;
+ case OPERATE_ON_DATA_NODES:
+ executeFromOperateOnDataNodes(env);
+ env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+ return Flow.NO_MORE_STATE;
+ }
+ } catch (PipeException | PipeSinkException | IOException e) {
+ if (isRollbackSupported(state)) {
+ LOGGER.error("Fail in OperatePipeProcedure", e);
+ setFailure(new ProcedureException(e.getMessage()));
+ } else {
+ LOGGER.error("Retrievable error trying to {} at state [{}]", getOperation(), state, e);
+ if (getCycles() > RETRY_THRESHOLD) {
+ setFailure(
+ new ProcedureException(
+ String.format("Fail to %s because %s", getOperation().name(), e.getMessage())));
+ }
+ }
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected boolean isRollbackSupported(OperatePipeTaskState state) {
+ return true;
+ }
+
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeTaskState state)
+ throws IOException, InterruptedException, ProcedureException {
+ switch (state) {
+ case VALIDATE_TASK:
+ rollbackFromValidateTask(env);
+ env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+ break;
+ case CALCULATE_INFO_FOR_TASK:
+ rollbackFromCalculateInfoForTask(env);
+ break;
+ case WRITE_CONFIG_NODE_CONSENSUS:
+ rollbackFromWriteConfigNodeConsensus(env);
+ break;
+ case OPERATE_ON_DATA_NODES:
+ rollbackFromOperateOnDataNodes(env);
+ break;
+ default:
+ LOGGER.error("Unsupported roll back STATE [{}]", state);
+ }
+ }
+
+ protected abstract void rollbackFromValidateTask(ConfigNodeProcedureEnv env);
+
+ protected abstract void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env);
+
+ protected abstract void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env);
+
+ protected abstract void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env);
+
+ @Override
+ protected OperatePipeTaskState getState(int stateId) {
+ return OperatePipeTaskState.values()[stateId];
+ }
+
+ @Override
+ protected int getStateId(OperatePipeTaskState state) {
+ return state.ordinal();
+ }
+
+ @Override
+ protected OperatePipeTaskState getInitialState() {
+ return OperatePipeTaskState.VALIDATE_TASK;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
new file mode 100644
index 0000000000..7b7c6def4e
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.pipe.task.meta.PipeConsensusGroupTaskMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
+import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CreatePipeProcedureV2.class);
+
+ private TCreatePipeReq createPipeRequest;
+
+ private PipeStaticMeta pipeStaticMeta;
+ private PipeRuntimeMeta pipeRuntimeMeta;
+
+ public CreatePipeProcedureV2() {
+ super();
+ }
+
+ public CreatePipeProcedureV2(TCreatePipeReq createPipeRequest) throws PipeException {
+ super();
+ this.createPipeRequest = createPipeRequest;
+ }
+
+ @Override
+ PipeTaskOperation getOperation() {
+ return PipeTaskOperation.CREATE_PIPE;
+ }
+
+ @Override
+ boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info(
+ "CreatePipeProcedureV2: executeFromValidateTask({})", createPipeRequest.getPipeName());
+
+ final PipeInfo pipeInfo = env.getConfigManager().getPipeManager().getPipeInfo();
+ return pipeInfo.getPipePluginInfo().checkBeforeCreatePipe(createPipeRequest)
+ && pipeInfo.getPipeTaskInfo().checkBeforeCreatePipe(createPipeRequest);
+ }
+
+ @Override
+ void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeManagementException {
+ LOGGER.info(
+ "CreatePipeProcedureV2: executeFromCalculateInfoForTask({})",
+ createPipeRequest.getPipeName());
+
+ pipeStaticMeta =
+ new PipeStaticMeta(
+ createPipeRequest.getPipeName(),
+ System.currentTimeMillis(),
+ createPipeRequest.getCollectorAttributes(),
+ createPipeRequest.getProcessorAttributes(),
+ createPipeRequest.getConnectorAttributes());
+
+ final Map<TConsensusGroupId, PipeConsensusGroupTaskMeta> consensusGroupIdToTaskMetaMap =
+ new HashMap<>();
+ env.getConfigManager()
+ .getLoadManager()
+ .getLatestRegionLeaderMap()
+ .forEach(
+ (region, leader) -> {
+ consensusGroupIdToTaskMetaMap.put(region, new PipeConsensusGroupTaskMeta(0, leader));
+ });
+ pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
+ }
+
+ @Override
+ void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
+ throws PipeManagementException {
+ LOGGER.info(
+ "CreatePipeProcedureV2: executeFromWriteConfigNodeConsensus({})",
+ createPipeRequest.getPipeName());
+
+ final ConsensusWriteResponse response =
+ env.getConfigManager()
+ .getConsensusManager()
+ .write(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta));
+ if (!response.isSuccessful()) {
+ throw new PipeManagementException(response.getErrorMessage());
+ }
+ }
+
+ @Override
+ void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
+ throws PipeManagementException, IOException {
+ LOGGER.info(
+ "CreatePipeProcedureV2: executeFromOperateOnDataNodes({})",
+ createPipeRequest.getPipeName());
+
+ if (RpcUtils.squashResponseStatusList(
+ env.createPipeOnDataNodes(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta)))
+ .getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeManagementException(
+ String.format(
+ "Failed to create pipe instance [%s] on data nodes",
+ createPipeRequest.getPipeName()));
+ }
+ }
+
+ @Override
+ protected void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info(
+ "CreatePipeProcedureV2: rollbackFromValidateTask({})", createPipeRequest.getPipeName());
+ // Do nothing
+ }
+
+ @Override
+ protected void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info(
+ "CreatePipeProcedureV2: rollbackFromCalculateInfoForTask({})",
+ createPipeRequest.getPipeName());
+ // Do nothing
+ }
+
+ @Override
+ protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
+ LOGGER.info(
+ "CreatePipeProcedureV2: rollbackFromWriteConfigNodeConsensus({})",
+ createPipeRequest.getPipeName());
+
+ final ConsensusWriteResponse response =
+ env.getConfigManager()
+ .getConsensusManager()
+ .write(new DropPipePlanV2(createPipeRequest.getPipeName()));
+ if (!response.isSuccessful()) {
+ throw new PipeManagementException(response.getErrorMessage());
+ }
+ }
+
+ @Override
+ protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
+ LOGGER.info(
+ "CreatePipeProcedureV2: rollbackFromOperateOnDataNodes({})",
+ createPipeRequest.getPipeName());
+
+ final TOperatePipeOnDataNodeReq request =
+ new TOperatePipeOnDataNodeReq()
+ .setPipeName(createPipeRequest.getPipeName())
+ .setOperation((byte) PipeTaskOperation.DROP_PIPE.ordinal());
+ if (RpcUtils.squashResponseStatusList(env.operatePipeOnDataNodes(request)).getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeManagementException(
+ String.format(
+ "Failed to rollback from operate on data nodes for task [%s]",
+ createPipeRequest.getPipeName()));
+ }
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeShort(ProcedureType.CREATE_PIPE_PROCEDURE_V2.getTypeCode());
+ super.serialize(stream);
+ ReadWriteIOUtils.write(createPipeRequest.getPipeName(), stream);
+ stream.writeInt(createPipeRequest.getCollectorAttributesSize());
+ for (Map.Entry<String, String> entry : createPipeRequest.getCollectorAttributes().entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), stream);
+ ReadWriteIOUtils.write(entry.getValue(), stream);
+ }
+ stream.writeInt(createPipeRequest.getProcessorAttributesSize());
+ for (Map.Entry<String, String> entry : createPipeRequest.getProcessorAttributes().entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), stream);
+ ReadWriteIOUtils.write(entry.getValue(), stream);
+ }
+ stream.writeInt(createPipeRequest.getConnectorAttributesSize());
+ for (Map.Entry<String, String> entry : createPipeRequest.getConnectorAttributes().entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), stream);
+ ReadWriteIOUtils.write(entry.getValue(), stream);
+ }
+ if (pipeStaticMeta != null) {
+ stream.writeBoolean(true);
+ pipeStaticMeta.serialize(stream);
+ } else {
+ stream.writeBoolean(false);
+ }
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ createPipeRequest =
+ new TCreatePipeReq()
+ .setPipeName(ReadWriteIOUtils.readString(byteBuffer))
+ .setCollectorAttributes(new HashMap<>())
+ .setProcessorAttributes(new HashMap<>())
+ .setConnectorAttributes(new HashMap<>());
+ int size = byteBuffer.getInt();
+ for (int i = 0; i < size; ++i) {
+ createPipeRequest
+ .getCollectorAttributes()
+ .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+ }
+ size = byteBuffer.getInt();
+ for (int i = 0; i < size; ++i) {
+ createPipeRequest
+ .getProcessorAttributes()
+ .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+ }
+ size = byteBuffer.getInt();
+ for (int i = 0; i < size; ++i) {
+ createPipeRequest
+ .getConnectorAttributes()
+ .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+ }
+ if (ReadWriteIOUtils.readBool(byteBuffer)) {
+ pipeStaticMeta = PipeStaticMeta.deserialize(byteBuffer);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CreatePipeProcedureV2 that = (CreatePipeProcedureV2) o;
+ return createPipeRequest.getPipeName().equals(that.createPipeRequest.getPipeName());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(createPipeRequest.getPipeName());
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
new file mode 100644
index 0000000000..36bd95094d
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DropPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DropPipeProcedureV2.class);
+
+ private String pipeName;
+
+ public DropPipeProcedureV2() {
+ super();
+ }
+
+ public DropPipeProcedureV2(String pipeName) throws PipeException {
+ super();
+ this.pipeName = pipeName;
+ }
+
+ @Override
+ PipeTaskOperation getOperation() {
+ return PipeTaskOperation.DROP_PIPE;
+ }
+
+ @Override
+ boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws PipeManagementException {
+ LOGGER.info("DropPipeProcedureV2: executeFromValidateTask({})", pipeName);
+
+ return env.getConfigManager()
+ .getPipeManager()
+ .getPipeInfo()
+ .getPipeTaskInfo()
+ .checkBeforeDropPipe(pipeName);
+ }
+
+ @Override
+ void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeManagementException {
+ LOGGER.info("DropPipeProcedureV2: executeFromCalculateInfoForTask({})", pipeName);
+ // Do nothing
+ }
+
+ @Override
+ void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
+ throws PipeManagementException {
+ LOGGER.info("DropPipeProcedureV2: executeFromWriteConfigNodeConsensus({})", pipeName);
+
+ final ConsensusWriteResponse response =
+ env.getConfigManager().getConsensusManager().write(new DropPipePlanV2(pipeName));
+ if (!response.isSuccessful()) {
+ throw new PipeManagementException(response.getErrorMessage());
+ }
+ }
+
+ @Override
+ void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws PipeManagementException {
+ LOGGER.info("DropPipeProcedureV2: executeFromOperateOnDataNodes({})", pipeName);
+
+ final TOperatePipeOnDataNodeReq request =
+ new TOperatePipeOnDataNodeReq()
+ .setPipeName(pipeName)
+ .setOperation((byte) PipeTaskOperation.DROP_PIPE.ordinal());
+ if (RpcUtils.squashResponseStatusList(env.operatePipeOnDataNodes(request)).getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeManagementException(
+ String.format("Failed to drop pipe instance [%s] on data nodes", pipeName));
+ }
+ }
+
+ @Override
+ protected void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info("DropPipeProcedureV2: rollbackFromValidateTask({})", pipeName);
+ // Do nothing
+ }
+
+ @Override
+ protected void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info("DropPipeProcedureV2: rollbackFromCalculateInfoForTask({})", pipeName);
+ // Do nothing
+ }
+
+ @Override
+ protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
+ LOGGER.info("DropPipeProcedureV2: rollbackFromWriteConfigNodeConsensus({})", pipeName);
+ // Do nothing
+ }
+
+ @Override
+ protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
+ LOGGER.info("DropPipeProcedureV2: rollbackFromOperateOnDataNodes({})", pipeName);
+ // Do nothing
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeShort(ProcedureType.DROP_PIPE_PROCEDURE_V2.getTypeCode());
+ super.serialize(stream);
+ ReadWriteIOUtils.write(pipeName, stream);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ pipeName = ReadWriteIOUtils.readString(byteBuffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DropPipeProcedureV2 that = (DropPipeProcedureV2) o;
+ return pipeName.equals(that.pipeName);
+ }
+
+ @Override
+ public int hashCode() {
+ return pipeName.hashCode();
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
new file mode 100644
index 0000000000..786526714e
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(StartPipeProcedureV2.class);
+
+ private String pipeName;
+
+ public StartPipeProcedureV2() {
+ super();
+ }
+
+ public StartPipeProcedureV2(String pipeName) throws PipeException {
+ super();
+ this.pipeName = pipeName;
+ }
+
+ @Override
+ PipeTaskOperation getOperation() {
+ return PipeTaskOperation.START_PIPE;
+ }
+
+ @Override
+ boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws PipeManagementException {
+ LOGGER.info("StartPipeProcedureV2: executeFromValidateTask({})", pipeName);
+
+ return env.getConfigManager()
+ .getPipeManager()
+ .getPipeInfo()
+ .getPipeTaskInfo()
+ .checkBeforeStartPipe(pipeName);
+ }
+
+ @Override
+ void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeManagementException {
+ LOGGER.info("StartPipeProcedureV2: executeFromCalculateInfoForTask({})", pipeName);
+ // Do nothing
+ }
+
+ @Override
+ void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
+ throws PipeManagementException {
+ LOGGER.info("StartPipeProcedureV2: executeFromWriteConfigNodeConsensus({})", pipeName);
+
+ final ConsensusWriteResponse response =
+ env.getConfigManager()
+ .getConsensusManager()
+ .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING));
+ if (!response.isSuccessful()) {
+ throw new PipeManagementException(response.getErrorMessage());
+ }
+ }
+
+ @Override
+ void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws PipeManagementException {
+ LOGGER.info("StartPipeProcedureV2: executeFromOperateOnDataNodes({})", pipeName);
+
+ final TOperatePipeOnDataNodeReq request =
+ new TOperatePipeOnDataNodeReq()
+ .setPipeName(pipeName)
+ .setOperation((byte) PipeTaskOperation.START_PIPE.ordinal());
+ if (RpcUtils.squashResponseStatusList(env.operatePipeOnDataNodes(request)).getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeManagementException(
+ String.format("Failed to start pipe instance [%s] on data nodes", pipeName));
+ }
+ }
+
+ @Override
+ protected void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info("StartPipeProcedureV2: rollbackFromValidateTask({})", pipeName);
+ // Do nothing
+ }
+
+ @Override
+ protected void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info("StartPipeProcedureV2: rollbackFromCalculateInfoForTask({})", pipeName);
+ // Do nothing
+ }
+
+ @Override
+ protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
+ LOGGER.info("StartPipeProcedureV2: rollbackFromWriteConfigNodeConsensus({})", pipeName);
+
+ final ConsensusWriteResponse response =
+ env.getConfigManager()
+ .getConsensusManager()
+ .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED));
+ if (!response.isSuccessful()) {
+ throw new PipeManagementException(response.getErrorMessage());
+ }
+ }
+
+ @Override
+ protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
+ throws PipeManagementException {
+ LOGGER.info("StartPipeProcedureV2: rollbackFromOperateOnDataNodes({})", pipeName);
+
+ final TOperatePipeOnDataNodeReq request =
+ new TOperatePipeOnDataNodeReq()
+ .setPipeName(pipeName)
+ .setOperation((byte) PipeTaskOperation.STOP_PIPE.ordinal());
+ if (RpcUtils.squashResponseStatusList(env.operatePipeOnDataNodes(request)).getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeManagementException(
+ String.format("Failed to rollback from start on data nodes for task [%s]", pipeName));
+ }
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeShort(ProcedureType.START_PIPE_PROCEDURE_V2.getTypeCode());
+ super.serialize(stream);
+ ReadWriteIOUtils.write(pipeName, stream);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ pipeName = ReadWriteIOUtils.readString(byteBuffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StartPipeProcedureV2 that = (StartPipeProcedureV2) o;
+ return pipeName.equals(that.pipeName);
+ }
+
+ @Override
+ public int hashCode() {
+ return pipeName.hashCode();
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
new file mode 100644
index 0000000000..0bb231b96d
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class StopPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(StopPipeProcedureV2.class);
+
+ private String pipeName;
+
+ public StopPipeProcedureV2() {
+ super();
+ }
+
+ public StopPipeProcedureV2(String pipeName) throws PipeException {
+ super();
+ this.pipeName = pipeName;
+ }
+
+ @Override
+ PipeTaskOperation getOperation() {
+ return PipeTaskOperation.STOP_PIPE;
+ }
+
+ @Override
+ boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws PipeManagementException {
+ LOGGER.info("StopPipeProcedureV2: executeFromValidateTask({})", pipeName);
+
+ return env.getConfigManager()
+ .getPipeManager()
+ .getPipeInfo()
+ .getPipeTaskInfo()
+ .checkBeforeStopPipe(pipeName);
+ }
+
+ @Override
+ void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeManagementException {
+ LOGGER.info("StopPipeProcedureV2: executeFromCalculateInfoForTask({})", pipeName);
+ // Do nothing
+ }
+
+ @Override
+ void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
+ throws PipeManagementException {
+ LOGGER.info("StopPipeProcedureV2: executeFromWriteConfigNodeConsensus({})", pipeName);
+
+ final ConsensusWriteResponse response =
+ env.getConfigManager()
+ .getConsensusManager()
+ .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED));
+ if (!response.isSuccessful()) {
+ throw new PipeManagementException(response.getErrorMessage());
+ }
+ }
+
+ @Override
+ void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws PipeManagementException {
+ LOGGER.info("StopPipeProcedureV2: executeFromOperateOnDataNodes({})", pipeName);
+
+ final TOperatePipeOnDataNodeReq request =
+ new TOperatePipeOnDataNodeReq()
+ .setPipeName(pipeName)
+ .setOperation((byte) PipeTaskOperation.STOP_PIPE.ordinal());
+ if (RpcUtils.squashResponseStatusList(env.operatePipeOnDataNodes(request)).getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeManagementException(
+ String.format("Failed to stop pipe instance [%s] on data nodes", pipeName));
+ }
+ }
+
+ @Override
+ protected void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info("StopPipeProcedureV2: rollbackFromValidateTask({})", pipeName);
+ // Do nothing
+ }
+
+ @Override
+ protected void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
+ LOGGER.info("StopPipeProcedureV2: rollbackFromCalculateInfoForTask({})", pipeName);
+ // Do nothing
+ }
+
+ @Override
+ protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
+ LOGGER.info("StopPipeProcedureV2: rollbackFromWriteConfigNodeConsensus({})", pipeName);
+
+ final ConsensusWriteResponse response =
+ env.getConfigManager()
+ .getConsensusManager()
+ .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING));
+ if (!response.isSuccessful()) {
+ throw new PipeManagementException(response.getErrorMessage());
+ }
+ }
+
+ @Override
+ protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
+ throws PipeManagementException {
+ LOGGER.info("StopPipeProcedureV2: rollbackFromOperateOnDataNodes({})", pipeName);
+
+ final TOperatePipeOnDataNodeReq request =
+ new TOperatePipeOnDataNodeReq()
+ .setPipeName(pipeName)
+ .setOperation((byte) PipeTaskOperation.START_PIPE.ordinal());
+ if (RpcUtils.squashResponseStatusList(env.operatePipeOnDataNodes(request)).getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeManagementException(
+ String.format("Failed to rollback from stop on data nodes for task [%s]", pipeName));
+ }
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeShort(ProcedureType.STOP_PIPE_PROCEDURE_V2.getTypeCode());
+ super.serialize(stream);
+ ReadWriteIOUtils.write(pipeName, stream);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ pipeName = ReadWriteIOUtils.readString(byteBuffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StopPipeProcedureV2 that = (StopPipeProcedureV2) o;
+ return pipeName.equals(that.pipeName);
+ }
+
+ @Override
+ public int hashCode() {
+ return pipeName.hashCode();
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/AbstractOperatePipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/AbstractOperatePipeProcedure.java
deleted file mode 100644
index c1ffa6bdf1..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/AbstractOperatePipeProcedure.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.procedure.impl.sync;
-
-import org.apache.iotdb.commons.exception.sync.PipeException;
-import org.apache.iotdb.commons.exception.sync.PipeSinkException;
-import org.apache.iotdb.commons.sync.pipe.SyncOperation;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
-import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
-import org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProcedure;
-import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
-import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** This procedure manage three kinds of PIPE operations: CREATE, START and STOP */
-abstract class AbstractOperatePipeProcedure
- extends StateMachineProcedure<ConfigNodeProcedureEnv, OperatePipeState> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(AbstractOperatePipeProcedure.class);
-
- private static final int RETRY_THRESHOLD = 3;
-
- /**
- * Execute at state OPERATE_CHECK
- *
- * @return true if procedure can finish directly
- */
- abstract boolean executeCheckCanSkip(ConfigNodeProcedureEnv env)
- throws PipeException, PipeSinkException;
-
- /** Execute at state PRE_OPERATE_PIPE_CONFIGNODE */
- abstract void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException;
-
- /** Execute at state OPERATE_PIPE_DATANODE */
- abstract void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException;
-
- /** Execute at state OPERATE_PIPE_CONFIGNODE */
- abstract void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException;
-
- abstract SyncOperation getOperation();
-
- @Override
- protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeState state)
- throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
- try {
- switch (state) {
- case OPERATE_CHECK:
- env.getConfigManager().getSyncManager().lockSyncMetadata();
- if (executeCheckCanSkip(env)) {
- env.getConfigManager().getSyncManager().unlockSyncMetadata();
- return Flow.NO_MORE_STATE;
- }
- setNextState(OperatePipeState.PRE_OPERATE_PIPE_CONFIGNODE);
- break;
- case PRE_OPERATE_PIPE_CONFIGNODE:
- executePreOperatePipeOnConfigNode(env);
- setNextState(OperatePipeState.OPERATE_PIPE_DATANODE);
- break;
- case OPERATE_PIPE_DATANODE:
- executeOperatePipeOnDataNode(env);
- setNextState(OperatePipeState.OPERATE_PIPE_CONFIGNODE);
- break;
- case OPERATE_PIPE_CONFIGNODE:
- executeOperatePipeOnConfigNode(env);
- env.getConfigManager().getSyncManager().unlockSyncMetadata();
- return Flow.NO_MORE_STATE;
- }
- } catch (PipeException | PipeSinkException e) {
- if (isRollbackSupported(state)) {
- LOGGER.error("Fail in OperatePipeProcedure", e);
- setFailure(new ProcedureException(e.getMessage()));
- } else {
- LOGGER.error("Retrievable error trying to {} at state [{}]", getOperation(), state, e);
- if (getCycles() > RETRY_THRESHOLD) {
- setFailure(
- new ProcedureException(
- String.format("Fail to %s because %s", getOperation().name(), e.getMessage())));
- }
- }
- }
- return Flow.HAS_MORE_STATE;
- }
-
- @Override
- protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv env) {
- env.getSchedulerLock().lock();
- try {
- if (env.getPipeLock().tryLock(this)) {
- LOGGER.info("procedureId {} acquire lock.", getProcId());
- return ProcedureLockState.LOCK_ACQUIRED;
- }
- env.getPipeLock().waitProcedure(this);
- LOGGER.info("procedureId {} wait for lock.", getProcId());
- return ProcedureLockState.LOCK_EVENT_WAIT;
- } finally {
- env.getSchedulerLock().unlock();
- }
- }
-
- @Override
- protected void releaseLock(ConfigNodeProcedureEnv env) {
- env.getSchedulerLock().lock();
- try {
- LOGGER.info("procedureId {} release lock.", getProcId());
- if (env.getPipeLock().releaseLock(this)) {
- env.getPipeLock().wakeWaitingProcedures(env.getScheduler());
- }
- } finally {
- env.getSchedulerLock().unlock();
- }
- }
-
- @Override
- protected OperatePipeState getState(int stateId) {
- return OperatePipeState.values()[stateId];
- }
-
- @Override
- protected int getStateId(OperatePipeState state) {
- return state.ordinal();
- }
-
- @Override
- protected OperatePipeState getInitialState() {
- return OperatePipeState.OPERATE_CHECK;
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
index 24dc3cef22..b1f22e5711 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
@@ -18,38 +18,24 @@
*/
package org.apache.iotdb.confignode.procedure.impl.sync;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.sync.PipeException;
-import org.apache.iotdb.commons.exception.sync.PipeSinkException;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
-import org.apache.iotdb.commons.sync.pipe.PipeStatus;
-import org.apache.iotdb.commons.sync.pipe.SyncOperation;
-import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
-import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Map;
import java.util.Objects;
import java.util.Set;
-public class CreatePipeProcedure extends AbstractOperatePipeProcedure {
- private static final Logger LOGGER = LoggerFactory.getLogger(CreatePipeProcedure.class);
+// Empty procedure for old sync, restored only for compatibility
+public class CreatePipeProcedure extends Procedure<ConfigNodeProcedureEnv> {
private PipeInfo pipeInfo;
private Set<Integer> executedDataNodeIds = new HashSet<>();
@@ -58,111 +44,20 @@ public class CreatePipeProcedure extends AbstractOperatePipeProcedure {
super();
}
- public CreatePipeProcedure(TCreatePipeReq req) throws PipeException {
- super();
- this.pipeInfo = SyncPipeUtil.parseTCreatePipeReqAsPipeInfo(req, System.currentTimeMillis());
- }
-
- @TestOnly
- public void setExecutedDataNodeIds(Set<Integer> executedDataNodeIds) {
- this.executedDataNodeIds = executedDataNodeIds;
- }
-
@Override
- boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException, PipeSinkException {
- LOGGER.info("Start to create PIPE [{}]", pipeInfo.getPipeName());
- env.getConfigManager().getSyncManager().checkAddPipe(pipeInfo);
- return false;
+ protected Procedure<ConfigNodeProcedureEnv>[] execute(
+ ConfigNodeProcedureEnv configNodeProcedureEnv)
+ throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+ return new Procedure[0];
}
@Override
- void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
- LOGGER.info("Start to pre-create PIPE [{}] on Config Nodes", pipeInfo.getPipeName());
- TSStatus status = env.getConfigManager().getSyncManager().preCreatePipe(pipeInfo);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(status.getMessage());
- }
- }
-
- @Override
- void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException {
- LOGGER.info("Start to broadcast create PIPE [{}] on Data Nodes", pipeInfo.getPipeName());
- Map<Integer, TSStatus> responseMap =
- env.getConfigManager().getSyncManager().preCreatePipeOnDataNodes(pipeInfo);
- TSStatus status = RpcUtils.squashResponseStatusList(new ArrayList<>(responseMap.values()));
- executedDataNodeIds.addAll(responseMap.keySet());
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(
- String.format(
- "Fail to create PIPE [%s] because %s.",
- pipeInfo.getPipeName(),
- StringUtils.join(
- responseMap.values().stream()
- .filter(i -> i.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
- .map(TSStatus::getMessage)
- .toArray(),
- ", ")));
- }
- }
+ protected void rollback(ConfigNodeProcedureEnv configNodeProcedureEnv)
+ throws IOException, InterruptedException, ProcedureException {}
@Override
- void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
- LOGGER.info("Start to create PIPE [{}] on Config Nodes", pipeInfo.getPipeName());
- TSStatus status =
- env.getConfigManager()
- .getSyncManager()
- .setPipeStatus(pipeInfo.getPipeName(), PipeStatus.STOP);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(status.getMessage());
- }
- }
-
- @Override
- SyncOperation getOperation() {
- return SyncOperation.CREATE_PIPE;
- }
-
- @Override
- protected boolean isRollbackSupported(OperatePipeState state) {
- switch (state) {
- case OPERATE_CHECK:
- case PRE_OPERATE_PIPE_CONFIGNODE:
- case OPERATE_PIPE_DATANODE:
- return true;
- default:
- return false;
- }
- }
-
- @Override
- protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
- throws IOException, InterruptedException, ProcedureException {
- LOGGER.info("Roll back CreatePipeProcedure at STATE [{}]", state);
- switch (state) {
- case OPERATE_CHECK:
- env.getConfigManager().getSyncManager().unlockSyncMetadata();
- break;
- case PRE_OPERATE_PIPE_CONFIGNODE:
- TSStatus status = env.getConfigManager().getSyncManager().dropPipe(pipeInfo.getPipeName());
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new ProcedureException(
- String.format(
- "Failed to create pipe and failed to roll back because %s. Please execute [DROP PIPE %s] manually.",
- status.getMessage(), pipeInfo.getPipeName()));
- }
- break;
- case OPERATE_PIPE_DATANODE:
- env.getConfigManager()
- .getSyncManager()
- .operatePipeOnDataNodesForRollback(
- pipeInfo.getPipeName(),
- pipeInfo.getCreateTime(),
- SyncOperation.DROP_PIPE,
- executedDataNodeIds);
- break;
- default:
- LOGGER.error("Unsupported roll back STATE [{}]", state);
- }
+ protected boolean abort(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ return false;
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java
index fd7f6f36a8..aa61bc8949 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java
@@ -18,32 +18,21 @@
*/
package org.apache.iotdb.confignode.procedure.impl.sync;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.sync.PipeException;
-import org.apache.iotdb.commons.sync.pipe.PipeInfo;
-import org.apache.iotdb.commons.sync.pipe.PipeStatus;
-import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Map;
import java.util.Objects;
-public class DropPipeProcedure extends AbstractOperatePipeProcedure {
- private static final Logger LOGGER = LoggerFactory.getLogger(DropPipeProcedure.class);
+// Empty procedure for old sync, restored only for compatibility
+public class DropPipeProcedure extends Procedure<ConfigNodeProcedureEnv> {
private String pipeName;
@@ -51,80 +40,20 @@ public class DropPipeProcedure extends AbstractOperatePipeProcedure {
super();
}
- public DropPipeProcedure(String pipeName) throws PipeException {
- super();
- this.pipeName = pipeName;
- }
-
- @Override
- boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException {
- LOGGER.info("Start to drop PIPE [{}]", pipeName);
- // throw PipeNotExistException if pipe not exist
- PipeInfo pipeInfo = env.getConfigManager().getSyncManager().getPipeInfo(pipeName);
- return false;
- }
-
@Override
- void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
- LOGGER.info("Start to pre-drop PIPE [{}] on Config Nodes", pipeName);
- TSStatus status =
- env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.DROP);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(status.getMessage());
- }
+ protected Procedure<ConfigNodeProcedureEnv>[] execute(
+ ConfigNodeProcedureEnv configNodeProcedureEnv)
+ throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+ return new Procedure[0];
}
@Override
- void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException {
- LOGGER.info("Start to broadcast drop PIPE [{}] on Data Nodes", pipeName);
- Map<Integer, TSStatus> responseMap =
- env.getConfigManager()
- .getSyncManager()
- .operatePipeOnDataNodes(pipeName, SyncOperation.DROP_PIPE);
- TSStatus status = RpcUtils.squashResponseStatusList(new ArrayList<>(responseMap.values()));
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(
- String.format(
- "Fail to drop PIPE [%s] because %s. Please execute [DROP PIPE %s] later to retry.",
- pipeName,
- StringUtils.join(
- responseMap.values().stream()
- .filter(i -> i.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
- .map(TSStatus::getMessage)
- .toArray(),
- ", "),
- pipeName));
- }
- }
+ protected void rollback(ConfigNodeProcedureEnv configNodeProcedureEnv)
+ throws IOException, InterruptedException, ProcedureException {}
@Override
- void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
- LOGGER.info("Start to drop PIPE [{}] on Config Nodes", pipeName);
- TSStatus status = env.getConfigManager().getSyncManager().dropPipe(pipeName);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(status.getMessage());
- }
- }
-
- @Override
- SyncOperation getOperation() {
- return SyncOperation.DROP_PIPE;
- }
-
- @Override
- protected boolean isRollbackSupported(OperatePipeState state) {
- return state == OperatePipeState.OPERATE_CHECK;
- }
-
- @Override
- protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
- throws IOException, InterruptedException, ProcedureException {
- LOGGER.info("Roll back DropPipeProcedure at STATE [{}]", state);
- if (state == OperatePipeState.OPERATE_CHECK) {
- env.getConfigManager().getSyncManager().unlockSyncMetadata();
- } else {
- LOGGER.error("Unsupported roll back STATE [{}]", state);
- }
+ protected boolean abort(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ return false;
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
index d7e68434d9..658d319cbd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
@@ -18,35 +18,24 @@
*/
package org.apache.iotdb.confignode.procedure.impl.sync;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
-import org.apache.iotdb.commons.sync.pipe.PipeStatus;
-import org.apache.iotdb.commons.sync.pipe.SyncOperation;
-import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Map;
import java.util.Objects;
import java.util.Set;
-public class StartPipeProcedure extends AbstractOperatePipeProcedure {
- private static final Logger LOGGER = LoggerFactory.getLogger(StartPipeProcedure.class);
+// Empty procedure for old sync, used only for compatibility
+public class StartPipeProcedure extends Procedure<ConfigNodeProcedureEnv> {
private String pipeName;
private PipeInfo pipeInfo;
@@ -56,119 +45,20 @@ public class StartPipeProcedure extends AbstractOperatePipeProcedure {
super();
}
- public StartPipeProcedure(String pipeName) throws PipeException {
- super();
- this.pipeName = pipeName;
- }
-
- @TestOnly
- public void setPipeInfo(PipeInfo pipeInfo) {
- this.pipeInfo = pipeInfo;
- }
-
- @TestOnly
- public void setExecutedDataNodeIds(Set<Integer> executedDataNodeIds) {
- this.executedDataNodeIds = executedDataNodeIds;
- }
-
@Override
- boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException {
- LOGGER.info("Start to start PIPE [{}]", pipeName);
- pipeInfo = env.getConfigManager().getSyncManager().getPipeInfo(pipeName);
- if (pipeInfo.getStatus().equals(PipeStatus.DROP)) {
- throw new PipeException(
- String.format("PIPE [%s] has been dropped and cannot be started again.", pipeName));
- }
- return pipeInfo.getStatus().equals(PipeStatus.RUNNING);
+ protected Procedure<ConfigNodeProcedureEnv>[] execute(
+ ConfigNodeProcedureEnv configNodeProcedureEnv)
+ throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+ return new Procedure[0];
}
@Override
- void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
- LOGGER.info("Start to pre-start PIPE [{}] on Config Nodes", pipeName);
- TSStatus status =
- env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.PARTIAL_START);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(status.getMessage());
- }
- }
+ protected void rollback(ConfigNodeProcedureEnv configNodeProcedureEnv)
+ throws IOException, InterruptedException, ProcedureException {}
@Override
- void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException {
- LOGGER.info("Start to broadcast start PIPE [{}] on Data Nodes", pipeName);
- Map<Integer, TSStatus> responseMap =
- env.getConfigManager()
- .getSyncManager()
- .operatePipeOnDataNodes(pipeName, SyncOperation.START_PIPE);
- TSStatus status = RpcUtils.squashResponseStatusList(new ArrayList<>(responseMap.values()));
- executedDataNodeIds.addAll(responseMap.keySet());
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(
- String.format(
- "Fail to start PIPE [%s] because %s.",
- pipeName,
- StringUtils.join(
- responseMap.values().stream()
- .filter(i -> i.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
- .map(TSStatus::getMessage)
- .toArray(),
- ", ")));
- }
- }
-
- @Override
- void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
- LOGGER.info("Start to start PIPE [{}] on Config Nodes", pipeName);
- TSStatus status =
- env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.RUNNING);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(status.getMessage());
- }
- }
-
- @Override
- SyncOperation getOperation() {
- return SyncOperation.START_PIPE;
- }
-
- @Override
- protected boolean isRollbackSupported(OperatePipeState state) {
- switch (state) {
- case OPERATE_CHECK:
- case PRE_OPERATE_PIPE_CONFIGNODE:
- case OPERATE_PIPE_DATANODE:
- return true;
- default:
- return false;
- }
- }
-
- @Override
- protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
- throws IOException, InterruptedException, ProcedureException {
- LOGGER.info("Roll back StartPipeProcedure at STATE [{}]", state);
- switch (state) {
- case OPERATE_CHECK:
- env.getConfigManager().getSyncManager().unlockSyncMetadata();
- break;
- case PRE_OPERATE_PIPE_CONFIGNODE:
- TSStatus status =
- env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.STOP);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new ProcedureException(
- String.format(
- "Failed to start pipe and failed to roll back because %s. Please execute [STOP PIPE %s] manually.",
- status.getMessage(), pipeName));
- }
- break;
- case OPERATE_PIPE_DATANODE:
- env.getConfigManager()
- .getSyncManager()
- .operatePipeOnDataNodesForRollback(
- pipeName, pipeInfo.getCreateTime(), SyncOperation.STOP_PIPE, executedDataNodeIds);
- break;
- default:
- LOGGER.error("Unsupported roll back STATE [{}]", state);
- }
+ protected boolean abort(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ return false;
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java
index 0f15b04840..88f4d61edd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java
@@ -18,35 +18,24 @@
*/
package org.apache.iotdb.confignode.procedure.impl.sync;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
-import org.apache.iotdb.commons.sync.pipe.PipeStatus;
-import org.apache.iotdb.commons.sync.pipe.SyncOperation;
-import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Map;
import java.util.Objects;
import java.util.Set;
-public class StopPipeProcedure extends AbstractOperatePipeProcedure {
- private static final Logger LOGGER = LoggerFactory.getLogger(StopPipeProcedure.class);
+// Empty procedure for old sync, used only for compatibility
+public class StopPipeProcedure extends Procedure<ConfigNodeProcedureEnv> {
private String pipeName;
private PipeInfo pipeInfo;
@@ -56,119 +45,20 @@ public class StopPipeProcedure extends AbstractOperatePipeProcedure {
super();
}
- public StopPipeProcedure(String pipeName) throws PipeException {
- super();
- this.pipeName = pipeName;
- }
-
- @TestOnly
- public void setPipeInfo(PipeInfo pipeInfo) {
- this.pipeInfo = pipeInfo;
- }
-
- @TestOnly
- public void setExecutedDataNodeIds(Set<Integer> executedDataNodeIds) {
- this.executedDataNodeIds = executedDataNodeIds;
- }
-
@Override
- boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException {
- LOGGER.info("Start to stop PIPE [{}]", pipeName);
- pipeInfo = env.getConfigManager().getSyncManager().getPipeInfo(pipeName);
- if (pipeInfo.getStatus().equals(PipeStatus.DROP)) {
- throw new PipeException(
- String.format("PIPE [%s] has been dropped and cannot be stopped again.", pipeName));
- }
- return pipeInfo.getStatus().equals(PipeStatus.STOP);
+ protected Procedure<ConfigNodeProcedureEnv>[] execute(
+ ConfigNodeProcedureEnv configNodeProcedureEnv)
+ throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+ return new Procedure[0];
}
@Override
- void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
- LOGGER.info("Start to pre-stop PIPE [{}] on Config Nodes", pipeName);
- TSStatus status =
- env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.PARTIAL_STOP);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(status.getMessage());
- }
- }
+ protected void rollback(ConfigNodeProcedureEnv configNodeProcedureEnv)
+ throws IOException, InterruptedException, ProcedureException {}
@Override
- void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException {
- LOGGER.info("Start to broadcast stop PIPE [{}] on Data Nodes", pipeName);
- Map<Integer, TSStatus> responseMap =
- env.getConfigManager()
- .getSyncManager()
- .operatePipeOnDataNodes(pipeName, SyncOperation.STOP_PIPE);
- TSStatus status = RpcUtils.squashResponseStatusList(new ArrayList<>(responseMap.values()));
- executedDataNodeIds.addAll(responseMap.keySet());
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(
- String.format(
- "Fail to stop PIPE [%s] because %s.",
- pipeName,
- StringUtils.join(
- responseMap.values().stream()
- .filter(i -> i.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
- .map(TSStatus::getMessage)
- .toArray(),
- ", ")));
- }
- }
-
- @Override
- void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
- LOGGER.info("Start to stop PIPE [{}] on Config Nodes", pipeName);
- TSStatus status =
- env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.STOP);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(status.getMessage());
- }
- }
-
- @Override
- SyncOperation getOperation() {
- return SyncOperation.STOP_PIPE;
- }
-
- @Override
- protected boolean isRollbackSupported(OperatePipeState state) {
- switch (state) {
- case OPERATE_CHECK:
- case PRE_OPERATE_PIPE_CONFIGNODE:
- case OPERATE_PIPE_DATANODE:
- return true;
- default:
- return false;
- }
- }
-
- @Override
- protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
- throws IOException, InterruptedException, ProcedureException {
- LOGGER.info("Roll back StopPipeProcedure at STATE [{}]", state);
- switch (state) {
- case OPERATE_CHECK:
- env.getConfigManager().getSyncManager().unlockSyncMetadata();
- break;
- case PRE_OPERATE_PIPE_CONFIGNODE:
- TSStatus status =
- env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.RUNNING);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new ProcedureException(
- String.format(
- "Failed to stop pipe and failed to roll back because %s. Please execute [STOP PIPE %s] manually.",
- status.getMessage(), pipeName));
- }
- break;
- case OPERATE_PIPE_DATANODE:
- env.getConfigManager()
- .getSyncManager()
- .operatePipeOnDataNodesForRollback(
- pipeName, pipeInfo.getCreateTime(), SyncOperation.START_PIPE, executedDataNodeIds);
- break;
- default:
- LOGGER.error("Unsupported roll back STATE [{}]", state);
- }
+ protected boolean abort(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ return false;
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/sync/OperatePipeState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/task/OperatePipeTaskState.java
similarity index 80%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/sync/OperatePipeState.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/task/OperatePipeTaskState.java
index c8a263ec43..fdbc65fe4d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/sync/OperatePipeState.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/task/OperatePipeTaskState.java
@@ -16,11 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.state.sync;
-public enum OperatePipeState {
- OPERATE_CHECK,
- PRE_OPERATE_PIPE_CONFIGNODE,
- OPERATE_PIPE_DATANODE,
- OPERATE_PIPE_CONFIGNODE
+package org.apache.iotdb.confignode.procedure.state.pipe.task;
+
+public enum OperatePipeTaskState {
+ VALIDATE_TASK,
+ CALCULATE_INFO_FOR_TASK,
+ OPERATE_ON_DATA_NODES,
+ WRITE_CONFIG_NODE_CONSENSUS
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 9ef0b7422d..7e016c4dc8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -28,6 +28,10 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure
import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.schema.DeactivateTemplateProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteDatabaseProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure;
@@ -102,6 +106,18 @@ public class ProcedureFactory implements IProcedureFactory {
case DROP_PIPE_PROCEDURE:
procedure = new DropPipeProcedure();
break;
+ case CREATE_PIPE_PROCEDURE_V2:
+ procedure = new CreatePipeProcedureV2();
+ break;
+ case START_PIPE_PROCEDURE_V2:
+ procedure = new StartPipeProcedureV2();
+ break;
+ case STOP_PIPE_PROCEDURE_V2:
+ procedure = new StopPipeProcedureV2();
+ break;
+ case DROP_PIPE_PROCEDURE_V2:
+ procedure = new DropPipeProcedureV2();
+ break;
case CREATE_CQ_PROCEDURE:
procedure =
new CreateCQProcedure(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index c5d6054646..74869f51d3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -44,7 +44,7 @@ public enum ProcedureType {
CREATE_TRIGGER_PROCEDURE((short) 400),
DROP_TRIGGER_PROCEDURE((short) 401),
- /** Sync */
+ /** Old sync */
CREATE_PIPE_PROCEDURE((short) 500),
START_PIPE_PROCEDURE((short) 501),
STOP_PIPE_PROCEDURE((short) 502),
@@ -63,7 +63,13 @@ public enum ProcedureType {
/** Pipe Plugin */
CREATE_PIPE_PLUGIN_PROCEDURE((short) 900),
- DROP_PIPE_PLUGIN_PROCEDURE((short) 901);
+ DROP_PIPE_PLUGIN_PROCEDURE((short) 901),
+
+ /** Pipe Task */
+ CREATE_PIPE_PROCEDURE_V2((short) 1001),
+ START_PIPE_PROCEDURE_V2((short) 1002),
+ STOP_PIPE_PROCEDURE_V2((short) 1003),
+ DROP_PIPE_PROCEDURE_V2((short) 1004);
private final short typeCode;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 39975c550d..2e80a9b0d1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -55,8 +55,6 @@ import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaRep
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
import org.apache.iotdb.confignode.consensus.response.auth.PermissionInfoResp;
import org.apache.iotdb.confignode.consensus.response.database.CountDatabaseResp;
import org.apache.iotdb.confignode.consensus.response.database.DatabaseSchemaResp;
@@ -853,18 +851,24 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
}
@Override
+ @Deprecated
public TSStatus createPipeSink(TPipeSinkInfo req) {
- return configManager.createPipeSink(new CreatePipeSinkPlan(req));
+ // To be deleted
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
+ @Deprecated
public TSStatus dropPipeSink(TDropPipeSinkReq req) {
- return configManager.dropPipeSink(new DropPipeSinkPlan(req.getPipeSinkName()));
+ // To be deleted
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
+ @Deprecated
public TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req) {
- return configManager.getPipeSink(req);
+ // To be deleted
+ return new TGetPipeSinkResp();
}
@Override
@@ -893,11 +897,13 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
}
@Override
+ @Deprecated
public TGetAllPipeInfoResp getAllPipeInfo() {
return configManager.getAllPipeInfo();
}
@Override
+ @Deprecated
public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
return configManager.recordPipeMessage(req);
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 83c26ce16e..203e9f6a36 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -39,7 +39,12 @@ import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.commons.partition.SeriesPartitionTable;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeConsensusGroupTaskMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.sync.pipe.PipeStatus;
import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
import org.apache.iotdb.commons.trigger.TriggerInformation;
@@ -85,6 +90,11 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
@@ -93,12 +103,13 @@ import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGr
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlanV1;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan;
@@ -954,9 +965,9 @@ public class ConfigPhysicalPlanSerDeTest {
.setPipeSinkName("demo")
.setPipeSinkType("IoTDB")
.setAttributes(attributes);
- CreatePipeSinkPlan createPipeSinkPlan = new CreatePipeSinkPlan(pipeSinkInfo);
- CreatePipeSinkPlan createPipeSinkPlan1 =
- (CreatePipeSinkPlan)
+ CreatePipeSinkPlanV1 createPipeSinkPlan = new CreatePipeSinkPlanV1(pipeSinkInfo);
+ CreatePipeSinkPlanV1 createPipeSinkPlan1 =
+ (CreatePipeSinkPlanV1)
ConfigPhysicalPlan.Factory.create(createPipeSinkPlan.serializeToByteBuffer());
Assert.assertEquals(
createPipeSinkPlan.getPipeSinkInfo(), createPipeSinkPlan1.getPipeSinkInfo());
@@ -964,23 +975,23 @@ public class ConfigPhysicalPlanSerDeTest {
@Test
public void DropPipeSinkPlanTest() throws IOException {
- DropPipeSinkPlan dropPipeSinkPlan = new DropPipeSinkPlan("demo");
- DropPipeSinkPlan dropPipeSinkPlan1 =
- (DropPipeSinkPlan)
+ DropPipeSinkPlanV1 dropPipeSinkPlan = new DropPipeSinkPlanV1("demo");
+ DropPipeSinkPlanV1 dropPipeSinkPlan1 =
+ (DropPipeSinkPlanV1)
ConfigPhysicalPlan.Factory.create(dropPipeSinkPlan.serializeToByteBuffer());
Assert.assertEquals(dropPipeSinkPlan.getPipeSinkName(), dropPipeSinkPlan1.getPipeSinkName());
}
@Test
public void GetPipeSinkPlanTest() throws IOException {
- GetPipeSinkPlan getPipeSinkPlan = new GetPipeSinkPlan("demo");
- GetPipeSinkPlan getPipeSinkPlan1 =
- (GetPipeSinkPlan)
+ GetPipeSinkPlanV1 getPipeSinkPlan = new GetPipeSinkPlanV1("demo");
+ GetPipeSinkPlanV1 getPipeSinkPlan1 =
+ (GetPipeSinkPlanV1)
ConfigPhysicalPlan.Factory.create(getPipeSinkPlan.serializeToByteBuffer());
Assert.assertEquals(getPipeSinkPlan.getPipeSinkName(), getPipeSinkPlan1.getPipeSinkName());
- GetPipeSinkPlan getPipeSinkPlanWithNullName = new GetPipeSinkPlan();
- GetPipeSinkPlan getPipeSinkPlanWithNullName1 =
- (GetPipeSinkPlan)
+ GetPipeSinkPlanV1 getPipeSinkPlanWithNullName = new GetPipeSinkPlanV1();
+ GetPipeSinkPlanV1 getPipeSinkPlanWithNullName1 =
+ (GetPipeSinkPlanV1)
ConfigPhysicalPlan.Factory.create(getPipeSinkPlanWithNullName.serializeToByteBuffer());
Assert.assertEquals(
getPipeSinkPlanWithNullName.getPipeSinkName(),
@@ -992,37 +1003,120 @@ public class ConfigPhysicalPlanSerDeTest {
PipeInfo pipeInfo =
new TsFilePipeInfo(
"name", "demo", PipeStatus.PARTIAL_CREATE, System.currentTimeMillis(), 999, false);
- PreCreatePipePlan PreCreatePipePlan = new PreCreatePipePlan(pipeInfo);
- PreCreatePipePlan PreCreatePipePlan1 =
- (PreCreatePipePlan)
+ PreCreatePipePlanV1 PreCreatePipePlan = new PreCreatePipePlanV1(pipeInfo);
+ PreCreatePipePlanV1 PreCreatePipePlan1 =
+ (PreCreatePipePlanV1)
ConfigPhysicalPlan.Factory.create(PreCreatePipePlan.serializeToByteBuffer());
Assert.assertEquals(PreCreatePipePlan.getPipeInfo(), PreCreatePipePlan1.getPipeInfo());
}
@Test
- public void SetPipeStatusPlan() throws IOException {
- SetPipeStatusPlan setPipeStatusPlan = new SetPipeStatusPlan("pipe", PipeStatus.PARTIAL_CREATE);
- SetPipeStatusPlan setPipeStatusPlan1 =
- (SetPipeStatusPlan)
+ public void RecordPipeMessagePlanTest() throws IOException {
+ RecordPipeMessagePlan recordPipeMessagePlan =
+ new RecordPipeMessagePlan(
+ "testPipe", new PipeMessage(PipeMessage.PipeMessageType.ERROR, "testError"));
+ RecordPipeMessagePlan recordPipeMessagePlan1 =
+ (RecordPipeMessagePlan)
+ ConfigPhysicalPlan.Factory.create(recordPipeMessagePlan.serializeToByteBuffer());
+ Assert.assertEquals(recordPipeMessagePlan.getPipeName(), recordPipeMessagePlan1.getPipeName());
+ Assert.assertEquals(
+ recordPipeMessagePlan.getPipeMessage().getType(),
+ recordPipeMessagePlan1.getPipeMessage().getType());
+ Assert.assertEquals(
+ recordPipeMessagePlan.getPipeMessage().getMessage(),
+ recordPipeMessagePlan1.getPipeMessage().getMessage());
+ }
+
+ @Test
+ public void SetPipeStatusPlanTest() throws IOException {
+ SetPipeStatusPlanV1 setPipeStatusPlan =
+ new SetPipeStatusPlanV1("pipe", PipeStatus.PARTIAL_CREATE);
+ SetPipeStatusPlanV1 setPipeStatusPlan1 =
+ (SetPipeStatusPlanV1)
ConfigPhysicalPlan.Factory.create(setPipeStatusPlan.serializeToByteBuffer());
Assert.assertEquals(setPipeStatusPlan.getPipeName(), setPipeStatusPlan1.getPipeName());
Assert.assertEquals(setPipeStatusPlan.getPipeStatus(), setPipeStatusPlan1.getPipeStatus());
}
+ @Test
+ public void CreatePipePlanV2Test() throws IOException {
+ Map<String, String> collectorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+ collectorAttributes.put("collector", "org.apache.iotdb.pipe.collector.DefaultCollector");
+ processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor");
+ connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocal.ThriftTransporter");
+ PipeConsensusGroupTaskMeta pipeConsensusGroupTaskMeta = new PipeConsensusGroupTaskMeta(0, 1);
+ Map<TConsensusGroupId, PipeConsensusGroupTaskMeta> pipeTasks = new HashMap<>();
+ pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeConsensusGroupTaskMeta);
+ PipeStaticMeta pipeStaticMeta =
+ new PipeStaticMeta(
+ "testPipe", 121, collectorAttributes, processorAttributes, connectorAttributes);
+ PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
+ CreatePipePlanV2 createPipePlanV2 = new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta);
+ CreatePipePlanV2 createPipePlanV21 =
+ (CreatePipePlanV2)
+ ConfigPhysicalPlan.Factory.create(createPipePlanV2.serializeToByteBuffer());
+ Assert.assertEquals(
+ createPipePlanV2.getPipeStaticMeta(), createPipePlanV21.getPipeStaticMeta());
+ }
+
+ @Test
+ public void SetPipeStatusPlanV2Test() throws IOException {
+ SetPipeStatusPlanV2 setPipeStatusPlanV2 =
+ new SetPipeStatusPlanV2("pipe", org.apache.iotdb.commons.pipe.task.meta.PipeStatus.RUNNING);
+ SetPipeStatusPlanV2 setPipeStatusPlanV21 =
+ (SetPipeStatusPlanV2)
+ ConfigPhysicalPlan.Factory.create(setPipeStatusPlanV2.serializeToByteBuffer());
+ Assert.assertEquals(setPipeStatusPlanV2.getPipeName(), setPipeStatusPlanV21.getPipeName());
+ Assert.assertEquals(setPipeStatusPlanV2.getPipeStatus(), setPipeStatusPlanV21.getPipeStatus());
+ }
+
+ @Test
+ public void DropPipePlanV2Test() throws IOException {
+ DropPipePlanV2 dropPipePlanV2 = new DropPipePlanV2("demo");
+ DropPipePlanV2 dropPipePlanV21 =
+ (DropPipePlanV2) ConfigPhysicalPlan.Factory.create(dropPipePlanV2.serializeToByteBuffer());
+ Assert.assertEquals(dropPipePlanV2.getPipeName(), dropPipePlanV21.getPipeName());
+ }
+
@Test
public void ShowPipePlanTest() throws IOException {
- ShowPipePlan showPipePlan = new ShowPipePlan("demo");
- ShowPipePlan showPipePlan1 =
- (ShowPipePlan) ConfigPhysicalPlan.Factory.create(showPipePlan.serializeToByteBuffer());
+ ShowPipePlanV1 showPipePlan = new ShowPipePlanV1("demo");
+ ShowPipePlanV1 showPipePlan1 =
+ (ShowPipePlanV1) ConfigPhysicalPlan.Factory.create(showPipePlan.serializeToByteBuffer());
Assert.assertEquals(showPipePlan.getPipeName(), showPipePlan1.getPipeName());
- ShowPipePlan showPipePlanWithNullName = new ShowPipePlan();
- ShowPipePlan showPipePlanWithNullName1 =
- (ShowPipePlan)
+ ShowPipePlanV1 showPipePlanWithNullName = new ShowPipePlanV1();
+ ShowPipePlanV1 showPipePlanWithNullName1 =
+ (ShowPipePlanV1)
ConfigPhysicalPlan.Factory.create(showPipePlanWithNullName.serializeToByteBuffer());
Assert.assertEquals(
showPipePlanWithNullName.getPipeName(), showPipePlanWithNullName1.getPipeName());
}
+ @Test
+ public void CreatePipePluginPlanTest() throws IOException {
+ CreatePipePluginPlan createPipePluginPlan =
+ new CreatePipePluginPlan(
+ new PipePluginMeta("testPlugin", "org.apache.iotdb.testJar", "testJar", "???"),
+ new Binary("123"));
+ CreatePipePluginPlan createPipePluginPlan1 =
+ (CreatePipePluginPlan)
+ ConfigPhysicalPlan.Factory.create(createPipePluginPlan.serializeToByteBuffer());
+ Assert.assertEquals(
+ createPipePluginPlan.getPipePluginMeta(), createPipePluginPlan1.getPipePluginMeta());
+ Assert.assertEquals(createPipePluginPlan.getJarFile(), createPipePluginPlan1.getJarFile());
+ }
+
+ @Test
+ public void DropPipePluginPlanTest() throws IOException {
+ DropPipePluginPlan dropPipePluginPlan = new DropPipePluginPlan("testPlugin");
+ DropPipePluginPlan dropPipePluginPlan1 =
+ (DropPipePluginPlan)
+ ConfigPhysicalPlan.Factory.create(dropPipePluginPlan.serializeToByteBuffer());
+ Assert.assertEquals(dropPipePluginPlan.getPluginName(), dropPipePluginPlan1.getPluginName());
+ }
+
@Test
public void GetTriggerTablePlanTest() throws IOException {
GetTriggerTablePlan getTriggerTablePlan0 = new GetTriggerTablePlan(true);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSyncInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSyncInfoTest.java
deleted file mode 100644
index d69eeec641..0000000000
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSyncInfoTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.persistence;
-
-import org.apache.iotdb.commons.exception.sync.PipeSinkException;
-import org.apache.iotdb.commons.sync.pipesink.PipeSink;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
-import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
-
-public class ClusterSyncInfoTest {
-
- private ClusterSyncInfo clusterSyncInfo;
- private static final File snapshotDir = new File(BASE_OUTPUT_PATH, "snapshot");
-
- @Before
- public void setup() throws IOException {
- clusterSyncInfo = new ClusterSyncInfo();
- if (!snapshotDir.exists()) {
- snapshotDir.mkdirs();
- }
- }
-
- @After
- public void cleanup() throws IOException {
- if (snapshotDir.exists()) {
- FileUtils.deleteDirectory(snapshotDir);
- }
- }
-
- private void prepareClusterSyncInfo() {
- Map<String, String> attributes1 = new HashMap<>();
- attributes1.put("ip", "192.168.11.11");
- attributes1.put("port", "7766");
- TPipeSinkInfo pipeSinkInfo1 =
- new TPipeSinkInfo()
- .setPipeSinkName("demo1")
- .setPipeSinkType("IoTDB")
- .setAttributes(attributes1);
- Map<String, String> attributes2 = new HashMap<>();
- attributes2.put("ip", "192.168.22.2");
- attributes2.put("port", "7777");
- TPipeSinkInfo pipeSinkInfo2 =
- new TPipeSinkInfo()
- .setPipeSinkName("demo2")
- .setPipeSinkType("IoTDB")
- .setAttributes(attributes2);
-
- clusterSyncInfo.addPipeSink(new CreatePipeSinkPlan(pipeSinkInfo1));
- clusterSyncInfo.addPipeSink(new CreatePipeSinkPlan(pipeSinkInfo2));
- }
-
- @Test
- public void testEmptySnapshot() throws Exception {
- // test empty snapshot
- Assert.assertTrue(clusterSyncInfo.processTakeSnapshot(snapshotDir));
- ClusterSyncInfo clusterSyncInfo2 = new ClusterSyncInfo();
- clusterSyncInfo2.processLoadSnapshot(snapshotDir);
-
- List<PipeSink> expectedPipeSink =
- clusterSyncInfo.getPipeSink(new GetPipeSinkPlan()).getPipeSinkList();
- List<PipeSink> actualPipeSink =
- clusterSyncInfo2.getPipeSink(new GetPipeSinkPlan()).getPipeSinkList();
- Assert.assertEquals(expectedPipeSink, actualPipeSink);
- }
-
- @Test
- public void testSnapshot() throws Exception {
- // test snapshot with data
- prepareClusterSyncInfo();
- Assert.assertTrue(clusterSyncInfo.processTakeSnapshot(snapshotDir));
- ClusterSyncInfo clusterSyncInfo2 = new ClusterSyncInfo();
- clusterSyncInfo2.processLoadSnapshot(snapshotDir);
-
- List<PipeSink> expectedPipeSink =
- clusterSyncInfo.getPipeSink(new GetPipeSinkPlan()).getPipeSinkList();
- List<PipeSink> actualPipeSink =
- clusterSyncInfo2.getPipeSink(new GetPipeSinkPlan()).getPipeSinkList();
- Assert.assertEquals(expectedPipeSink, actualPipeSink);
- }
-
- @Test
- public void testPipeSinkOperation() {
- prepareClusterSyncInfo();
- Map<String, String> attributes1 = new HashMap<>();
- attributes1.put("ip", "192.168.11.11");
- attributes1.put("port", "7766");
- Map<String, String> attributes2 = new HashMap<>();
- attributes2.put("ip", "Nonstandard");
- attributes2.put("port", "7777");
- TPipeSinkInfo alreadyExistSink =
- new TPipeSinkInfo()
- .setPipeSinkName("demo1")
- .setPipeSinkType("IoTDB")
- .setAttributes(attributes1);
- TPipeSinkInfo errorAttributeSink =
- new TPipeSinkInfo()
- .setPipeSinkName("demo3")
- .setPipeSinkType("IoTDB")
- .setAttributes(attributes2);
- TPipeSinkInfo nonExistSink =
- new TPipeSinkInfo()
- .setPipeSinkName("demo3")
- .setPipeSinkType("IoTDB")
- .setAttributes(attributes1);
-
- try {
- clusterSyncInfo.checkAddPipeSink(new CreatePipeSinkPlan(alreadyExistSink));
- Assert.fail("checkOperatePipeSink ignore failure.");
- } catch (PipeSinkException e) {
- // nothing
- }
- try {
- clusterSyncInfo.checkAddPipeSink(new CreatePipeSinkPlan(alreadyExistSink));
- Assert.fail("checkOperatePipeSink ignore failure.");
- } catch (PipeSinkException e) {
- // nothing
- }
- try {
- clusterSyncInfo.checkAddPipeSink(new CreatePipeSinkPlan(errorAttributeSink));
- Assert.fail("checkOperatePipeSink ignore failure.");
- } catch (PipeSinkException e) {
- // nothing
- }
-
- try {
- clusterSyncInfo.checkAddPipeSink(new CreatePipeSinkPlan(nonExistSink));
- clusterSyncInfo.checkDropPipeSink("demo1");
- } catch (PipeSinkException e) {
- Assert.fail("checkOperatePipeSink should not throw exception.");
- }
- }
-}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
new file mode 100644
index 0000000000..6384652baa
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeConsensusGroupTaskMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.DataRegion;
+import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class PipeInfoTest {
+
+ private static PipeInfo pipeInfo;
+ private static final File snapshotDir = new File(BASE_OUTPUT_PATH, "snapshot");
+
+ @Before
+ public void setup() throws IOException {
+ pipeInfo = new PipeInfo();
+ if (!snapshotDir.exists()) {
+ snapshotDir.mkdirs();
+ }
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ if (snapshotDir.exists()) {
+ FileUtils.deleteDirectory(snapshotDir);
+ }
+ }
+
+ @Test
+ public void testSnapshot() throws TException, IOException {
+ Map<String, String> collectorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+ collectorAttributes.put("collector", "org.apache.iotdb.pipe.collector.DefaultCollector");
+ processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor");
+ connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocal.ThriftTransporter");
+ PipeConsensusGroupTaskMeta pipeConsensusGroupTaskMeta = new PipeConsensusGroupTaskMeta(0, 1);
+ Map<TConsensusGroupId, PipeConsensusGroupTaskMeta> pipeTasks = new HashMap<>();
+ pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeConsensusGroupTaskMeta);
+ PipeStaticMeta pipeStaticMeta =
+ new PipeStaticMeta(
+ "testPipe", 121, collectorAttributes, processorAttributes, connectorAttributes);
+ PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
+ CreatePipePlanV2 createPipePlanV2 = new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta);
+ pipeInfo.getPipeTaskInfo().createPipe(createPipePlanV2);
+
+ CreatePipePluginPlan createPipePluginPlan =
+ new CreatePipePluginPlan(
+ new PipePluginMeta("testPlugin", "org.apache.iotdb.testJar", "testJar", "???"),
+ new Binary("123"));
+ pipeInfo.getPipePluginInfo().createPipePlugin(createPipePluginPlan);
+
+ pipeInfo.processTakeSnapshot(snapshotDir);
+
+ PipeInfo pipeInfo1 = new PipeInfo();
+ pipeInfo1.processLoadSnapshot(snapshotDir);
+
+ Assert.assertEquals(pipeInfo.toString(), pipeInfo1.toString());
+ Assert.assertEquals(pipeInfo, pipeInfo1);
+ }
+}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
similarity index 93%
copy from confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java
copy to confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
index cda15ec592..729e3a01f7 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
@@ -17,10 +17,9 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.impl.pipe;
+package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
-import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java
similarity index 92%
copy from confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
copy to confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java
index 02f7c856f2..6135aa0ab6 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java
@@ -17,9 +17,8 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.impl.pipe;
+package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
-import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
similarity index 57%
rename from confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
index cda15ec592..cf151fe595 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
@@ -17,42 +17,51 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.impl.pipe;
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
-import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
-import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
-import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.junit.Test;
import java.io.DataOutputStream;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-public class CreatePipePluginProcedureTest {
+public class CreatePipeProcedureV2Test {
@Test
public void serializeDeserializeTest() {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
- PipePluginMeta pipePluginMeta =
- new PipePluginMeta("test", "test.class", "test.jar", "testMD5test");
- CreatePipePluginProcedure proc =
- new CreatePipePluginProcedure(pipePluginMeta, new byte[] {1, 2, 3});
+ Map<String, String> collectorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+ collectorAttributes.put("collector", "org.apache.iotdb.pipe.collector.DefaultCollector");
+ processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor");
+ connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocal.ThriftTransporter");
+
+ CreatePipeProcedureV2 proc =
+ new CreatePipeProcedureV2(
+ new TCreatePipeReq()
+ .setPipeName("testPipe")
+ .setCollectorAttributes(collectorAttributes)
+ .setProcessorAttributes(processorAttributes)
+ .setConnectorAttributes(connectorAttributes));
try {
proc.serialize(outputStream);
ByteBuffer buffer =
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
- CreatePipePluginProcedure proc2 =
- (CreatePipePluginProcedure) ProcedureFactory.getInstance().create(buffer);
+ CreatePipeProcedureV2 proc2 =
+ (CreatePipeProcedureV2) ProcedureFactory.getInstance().create(buffer);
assertEquals(proc, proc2);
- assertEquals(new Binary(proc.getJarFile()), new Binary(proc2.getJarFile()));
} catch (Exception e) {
fail();
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2Test.java
similarity index 80%
copy from confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
copy to confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2Test.java
index 02f7c856f2..427d147833 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2Test.java
@@ -17,9 +17,8 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.impl.pipe;
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
-import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -31,20 +30,21 @@ import java.nio.ByteBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-public class DropPipePluginProcedureTest {
+public class DropPipeProcedureV2Test {
@Test
public void serializeDeserializeTest() {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
- DropPipePluginProcedure proc = new DropPipePluginProcedure("test");
+ DropPipeProcedureV2 proc = new DropPipeProcedureV2("testPipe");
try {
proc.serialize(outputStream);
ByteBuffer buffer =
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
- DropPipePluginProcedure proc2 =
- (DropPipePluginProcedure) ProcedureFactory.getInstance().create(buffer);
+ DropPipeProcedureV2 proc2 =
+ (DropPipeProcedureV2) ProcedureFactory.getInstance().create(buffer);
+
assertEquals(proc, proc2);
} catch (Exception e) {
fail();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2Test.java
similarity index 80%
copy from confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
copy to confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2Test.java
index 02f7c856f2..2f47718514 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2Test.java
@@ -17,9 +17,8 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.impl.pipe;
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
-import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -31,20 +30,21 @@ import java.nio.ByteBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-public class DropPipePluginProcedureTest {
+public class StartPipeProcedureV2Test {
@Test
public void serializeDeserializeTest() {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
- DropPipePluginProcedure proc = new DropPipePluginProcedure("test");
+ StartPipeProcedureV2 proc = new StartPipeProcedureV2("testPipe");
try {
proc.serialize(outputStream);
ByteBuffer buffer =
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
- DropPipePluginProcedure proc2 =
- (DropPipePluginProcedure) ProcedureFactory.getInstance().create(buffer);
+ StartPipeProcedureV2 proc2 =
+ (StartPipeProcedureV2) ProcedureFactory.getInstance().create(buffer);
+
assertEquals(proc, proc2);
} catch (Exception e) {
fail();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java
similarity index 80%
rename from confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java
index 02f7c856f2..c02417cde1 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java
@@ -17,9 +17,8 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.impl.pipe;
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
-import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -31,20 +30,21 @@ import java.nio.ByteBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-public class DropPipePluginProcedureTest {
+public class StopPipeProcedureV2Test {
@Test
public void serializeDeserializeTest() {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
- DropPipePluginProcedure proc = new DropPipePluginProcedure("test");
+ StopPipeProcedureV2 proc = new StopPipeProcedureV2("testPipe");
try {
proc.serialize(outputStream);
ByteBuffer buffer =
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
- DropPipePluginProcedure proc2 =
- (DropPipePluginProcedure) ProcedureFactory.getInstance().create(buffer);
+ StopPipeProcedureV2 proc2 =
+ (StopPipeProcedureV2) ProcedureFactory.getInstance().create(buffer);
+
assertEquals(proc, proc2);
} catch (Exception e) {
fail();
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
index d112eff23f..c02fdeadb6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
@@ -30,12 +30,12 @@ import java.util.Map;
public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
protected final Map<String, String> jarNameToMd5Map;
- protected final Map<String, Integer> jarNameToReferrenceCountMap;
+ protected final Map<String, Integer> jarNameToReferenceCountMap;
public ConfigNodePipePluginMetaKeeper() {
super();
jarNameToMd5Map = new HashMap<>();
- jarNameToReferrenceCountMap = new HashMap<>();
+ jarNameToReferenceCountMap = new HashMap<>();
}
public synchronized boolean containsJar(String jarName) {
@@ -47,22 +47,22 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
}
public synchronized void addJarNameAndMd5(String jarName, String md5) {
- if (jarNameToReferrenceCountMap.containsKey(jarName)) {
- jarNameToReferrenceCountMap.put(jarName, jarNameToReferrenceCountMap.get(jarName) + 1);
+ if (jarNameToReferenceCountMap.containsKey(jarName)) {
+ jarNameToReferenceCountMap.put(jarName, jarNameToReferenceCountMap.get(jarName) + 1);
} else {
- jarNameToReferrenceCountMap.put(jarName, 1);
+ jarNameToReferenceCountMap.put(jarName, 1);
jarNameToMd5Map.put(jarName, md5);
}
}
public synchronized void removeJarNameAndMd5IfPossible(String jarName) {
- if (jarNameToReferrenceCountMap.containsKey(jarName)) {
- int count = jarNameToReferrenceCountMap.get(jarName);
+ if (jarNameToReferenceCountMap.containsKey(jarName)) {
+ int count = jarNameToReferenceCountMap.get(jarName);
if (count == 1) {
- jarNameToReferrenceCountMap.remove(jarName);
+ jarNameToReferenceCountMap.remove(jarName);
jarNameToMd5Map.remove(jarName);
} else {
- jarNameToReferrenceCountMap.put(jarName, count - 1);
+ jarNameToReferenceCountMap.put(jarName, count - 1);
}
}
}
@@ -72,7 +72,7 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
for (Map.Entry<String, String> entry : jarNameToMd5Map.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
- ReadWriteIOUtils.write(jarNameToReferrenceCountMap.get(entry.getKey()), outputStream);
+ ReadWriteIOUtils.write(jarNameToReferenceCountMap.get(entry.getKey()), outputStream);
}
ReadWriteIOUtils.write(pipeNameToPipeMetaMap.size(), outputStream);
@@ -90,7 +90,7 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
final String md5 = ReadWriteIOUtils.readString(inputStream);
final int count = ReadWriteIOUtils.readInt(inputStream);
jarNameToMd5Map.put(jarName, md5);
- jarNameToReferrenceCountMap.put(jarName, count);
+ jarNameToReferenceCountMap.put(jarName, count);
}
final int pipePluginMetaSize = ReadWriteIOUtils.readInt(inputStream);
@@ -100,9 +100,9 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
}
}
- private void clear() {
+ public void clear() {
pipeNameToPipeMetaMap.clear();
jarNameToMd5Map.clear();
- jarNameToReferrenceCountMap.clear();
+ jarNameToReferenceCountMap.clear();
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
index d413c1632f..4417141106 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.pipe.plugin.meta;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
public abstract class PipePluginMetaKeeper {
@@ -49,4 +50,25 @@ public abstract class PipePluginMetaKeeper {
public boolean containsPipePlugin(String pluginName) {
return pipeNameToPipeMetaMap.containsKey(pluginName.toUpperCase());
}
+
+ public void clear() {
+ pipeNameToPipeMetaMap.clear();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PipePluginMetaKeeper that = (PipePluginMetaKeeper) o;
+ return pipeNameToPipeMetaMap.equals(that.pipeNameToPipeMetaMap);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pipeNameToPipeMetaMap);
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeConsensusGroupTaskMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeConsensusGroupTaskMeta.java
new file mode 100644
index 0000000000..39bc9d3c5f
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeConsensusGroupTaskMeta.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.task.meta;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PipeConsensusGroupTaskMeta {
+
+ // TODO: replace it with consensus index
+ private final AtomicLong index = new AtomicLong(0L);
+ private final AtomicInteger regionLeader = new AtomicInteger(0);
+
+ private PipeConsensusGroupTaskMeta() {}
+
+ public PipeConsensusGroupTaskMeta(long index, int regionLeader) {
+ this.index.set(index);
+ this.regionLeader.set(regionLeader);
+ }
+
+ public long getIndex() {
+ return index.get();
+ }
+
+ public int getRegionLeader() {
+ return regionLeader.get();
+ }
+
+ public void setIndex(long index) {
+ this.index.set(index);
+ }
+
+ public void setRegionLeader(int regionLeader) {
+ this.regionLeader.set(regionLeader);
+ }
+
+ public void serialize(DataOutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(index.get(), outputStream);
+ ReadWriteIOUtils.write(regionLeader.get(), outputStream);
+ }
+
+ public static PipeConsensusGroupTaskMeta deserialize(ByteBuffer byteBuffer) {
+ final PipeConsensusGroupTaskMeta PipeConsensusGroupTaskMeta = new PipeConsensusGroupTaskMeta();
+ PipeConsensusGroupTaskMeta.index.set(ReadWriteIOUtils.readLong(byteBuffer));
+ PipeConsensusGroupTaskMeta.regionLeader.set(ReadWriteIOUtils.readInt(byteBuffer));
+ return PipeConsensusGroupTaskMeta;
+ }
+
+ public static PipeConsensusGroupTaskMeta deserialize(InputStream inputStream) throws IOException {
+ return deserialize(
+ ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeConsensusGroupTaskMeta that = (PipeConsensusGroupTaskMeta) obj;
+ return index.get() == that.index.get() && regionLeader.get() == that.regionLeader.get();
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (index.get() * 31 + regionLeader.get());
+ }
+
+ @Override
+ public String toString() {
+ return "PipeTask{" + "index='" + index + '\'' + ", regionLeader='" + regionLeader + '\'' + '}';
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
new file mode 100644
index 0000000000..f75237bbdc
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.task.meta;
+
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class PipeMeta {
+
+ private final PipeStaticMeta staticMeta;
+ private final PipeRuntimeMeta runtimeMeta;
+
+ public PipeMeta(PipeStaticMeta staticMeta, PipeRuntimeMeta runtimeMeta) {
+ this.staticMeta = staticMeta;
+ this.runtimeMeta = runtimeMeta;
+ }
+
+ public PipeStaticMeta getStaticMeta() {
+ return staticMeta;
+ }
+
+ public PipeRuntimeMeta getRuntimeMeta() {
+ return runtimeMeta;
+ }
+
+ public ByteBuffer serialize() throws IOException {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+ serialize(outputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ }
+
+ public void serialize(DataOutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(staticMeta.serialize(), outputStream);
+ ReadWriteIOUtils.write(runtimeMeta.serialize(), outputStream);
+ }
+
+ public void serialize(FileOutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(staticMeta.serialize(), outputStream);
+ ReadWriteIOUtils.write(runtimeMeta.serialize(), outputStream);
+ }
+
+ public static PipeMeta deserialize(FileInputStream fileInputStream) throws IOException {
+ PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(fileInputStream);
+ PipeRuntimeMeta runtimeMeta = PipeRuntimeMeta.deserialize(fileInputStream);
+ return new PipeMeta(staticMeta, runtimeMeta);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PipeMeta pipeMeta = (PipeMeta) o;
+ return Objects.equals(staticMeta, pipeMeta.staticMeta)
+ && Objects.equals(runtimeMeta, pipeMeta.runtimeMeta);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(staticMeta, runtimeMeta);
+ }
+
+ @Override
+ public String toString() {
+ return "PipeMeta{" + "staticMeta=" + staticMeta + ", runtimeMeta=" + runtimeMeta + '}';
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
new file mode 100644
index 0000000000..b45286abaf
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.pipe.task.meta;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeMetaKeeper {
+
+ protected final Map<String, PipeMeta> pipeNameToPipeMetaMap;
+
+ public PipeMetaKeeper() {
+ pipeNameToPipeMetaMap = new ConcurrentHashMap<>();
+ }
+
+ public void addPipeMeta(String pipeName, PipeMeta pipeMeta) {
+ pipeNameToPipeMetaMap.put(pipeName, pipeMeta);
+ }
+
+ public PipeMeta getPipeMeta(String pipeName) {
+ return pipeNameToPipeMetaMap.get(pipeName);
+ }
+
+ public void removePipeMeta(String pipeName) {
+ pipeNameToPipeMetaMap.remove(pipeName);
+ }
+
+ public boolean containsPipeMeta(String pipeName) {
+ return pipeNameToPipeMetaMap.containsKey(pipeName);
+ }
+
+ public void clear() {
+ this.pipeNameToPipeMetaMap.clear();
+ }
+
+ public void processTakeSnapshot(FileOutputStream fileOutputStream) throws IOException {
+ ReadWriteIOUtils.write(pipeNameToPipeMetaMap.size(), fileOutputStream);
+ for (Map.Entry<String, PipeMeta> entry : pipeNameToPipeMetaMap.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), fileOutputStream);
+ entry.getValue().serialize(fileOutputStream);
+ }
+ }
+
+ public void processLoadSnapshot(FileInputStream fileInputStream) throws IOException {
+ clear();
+
+ final int size = ReadWriteIOUtils.readInt(fileInputStream);
+ for (int i = 0; i < size; i++) {
+ final String pipeName = ReadWriteIOUtils.readString(fileInputStream);
+ pipeNameToPipeMetaMap.put(pipeName, PipeMeta.deserialize(fileInputStream));
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PipeMetaKeeper that = (PipeMetaKeeper) o;
+ return Objects.equals(pipeNameToPipeMetaMap, that.pipeNameToPipeMetaMap);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pipeNameToPipeMetaMap);
+ }
+
+ @Override
+ public String toString() {
+ return "PipeMetaKeeper{" + "pipeNameToPipeMetaMap=" + pipeNameToPipeMetaMap + '}';
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
new file mode 100644
index 0000000000..faa01c67a6
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.task.meta;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class PipeRuntimeMeta {
+
+ private final AtomicReference<PipeStatus> status;
+ private final List<String> exceptionMessages;
+ private final Map<TConsensusGroupId, PipeConsensusGroupTaskMeta> consensusGroupIdToTaskMetaMap;
+
+ public PipeRuntimeMeta() {
+ status = new AtomicReference<>(PipeStatus.STOPPED);
+ exceptionMessages = new LinkedList<>();
+ consensusGroupIdToTaskMetaMap = new ConcurrentHashMap<>();
+ }
+
+ public PipeRuntimeMeta(
+ Map<TConsensusGroupId, PipeConsensusGroupTaskMeta> consensusGroupIdToTaskMetaMap) {
+ status = new AtomicReference<>(PipeStatus.STOPPED);
+ exceptionMessages = new LinkedList<>();
+ this.consensusGroupIdToTaskMetaMap = consensusGroupIdToTaskMetaMap;
+ }
+
+ public AtomicReference<PipeStatus> getStatus() {
+ return status;
+ }
+
+ public List<String> getExceptionMessages() {
+ return exceptionMessages;
+ }
+
+ public Map<TConsensusGroupId, PipeConsensusGroupTaskMeta> getConsensusGroupIdToTaskMetaMap() {
+ return consensusGroupIdToTaskMetaMap;
+ }
+
+ public ByteBuffer serialize() throws IOException {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+ serialize(outputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ }
+
+ public void serialize(DataOutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(status.get().getType(), outputStream);
+
+ // ignore exception messages
+
+ ReadWriteIOUtils.write(consensusGroupIdToTaskMetaMap.size(), outputStream);
+ for (Map.Entry<TConsensusGroupId, PipeConsensusGroupTaskMeta> entry :
+ consensusGroupIdToTaskMetaMap.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey().getId(), outputStream);
+ entry.getValue().serialize(outputStream);
+ }
+ }
+
+ public static PipeRuntimeMeta deserialize(InputStream inputStream) throws IOException {
+ return deserialize(
+ ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
+ }
+
+ public static PipeRuntimeMeta deserialize(ByteBuffer byteBuffer) throws IOException {
+ final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
+
+ pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(byteBuffer)));
+
+ // ignore exception messages
+
+ final int size = ReadWriteIOUtils.readInt(byteBuffer);
+ for (int i = 0; i < size; ++i) {
+ pipeRuntimeMeta.consensusGroupIdToTaskMetaMap.put(
+ new TConsensusGroupId(
+ TConsensusGroupType.DataRegion, ReadWriteIOUtils.readInt(byteBuffer)),
+ PipeConsensusGroupTaskMeta.deserialize(byteBuffer));
+ }
+
+ return pipeRuntimeMeta;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PipeRuntimeMeta that = (PipeRuntimeMeta) o;
+ return Objects.equals(status.get().getType(), that.status.get().getType())
+ && exceptionMessages.equals(that.exceptionMessages)
+ && consensusGroupIdToTaskMetaMap.equals(that.consensusGroupIdToTaskMetaMap);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(status, exceptionMessages, consensusGroupIdToTaskMetaMap);
+ }
+
+ @Override
+ public String toString() {
+ return "PipeRuntimeMeta{"
+ + "status="
+ + status
+ + ", exceptionMessages="
+ + exceptionMessages
+ + ", consensusGroupIdToTaskMetaMap="
+ + consensusGroupIdToTaskMetaMap
+ + '}';
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
new file mode 100644
index 0000000000..0165899f74
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.pipe.task.meta;
+
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PipeStaticMeta {
+
+ private String pipeName;
+ private long createTime;
+
+ private Map<String, String> collectorAttributes = new HashMap<>();
+ private Map<String, String> processorAttributes = new HashMap<>();
+ private Map<String, String> connectorAttributes = new HashMap<>();
+
+ private PipeStaticMeta() {}
+
+ public PipeStaticMeta(
+ String pipeName,
+ long createTime,
+ Map<String, String> collectorAttributes,
+ Map<String, String> processorAttributes,
+ Map<String, String> connectorAttributes) {
+ this.pipeName = pipeName.toUpperCase();
+ this.createTime = createTime;
+ this.collectorAttributes = collectorAttributes;
+ this.processorAttributes = processorAttributes;
+ this.connectorAttributes = connectorAttributes;
+ }
+
+ public String getPipeName() {
+ return pipeName;
+ }
+
+ public long getCreateTime() {
+ return createTime;
+ }
+
+ public Map<String, String> getCollectorAttributes() {
+ return collectorAttributes;
+ }
+
+ public Map<String, String> getProcessorAttributes() {
+ return collectorAttributes;
+ }
+
+ public Map<String, String> getConnectorAttributes() {
+ return collectorAttributes;
+ }
+
+ public ByteBuffer serialize() throws IOException {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+ serialize(outputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ }
+
+ public void serialize(DataOutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(pipeName, outputStream);
+ ReadWriteIOUtils.write(createTime, outputStream);
+
+ outputStream.writeInt(collectorAttributes.size());
+ for (Map.Entry<String, String> entry : collectorAttributes.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ outputStream.writeInt(processorAttributes.size());
+ for (Map.Entry<String, String> entry : processorAttributes.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ outputStream.writeInt(connectorAttributes.size());
+ for (Map.Entry<String, String> entry : connectorAttributes.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ }
+
+ public static PipeStaticMeta deserialize(InputStream inputStream) throws IOException {
+ return deserialize(
+ ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
+ }
+
+ public static PipeStaticMeta deserialize(ByteBuffer byteBuffer) {
+ final PipeStaticMeta pipeStaticMeta = new PipeStaticMeta();
+
+ pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(byteBuffer);
+ pipeStaticMeta.createTime = ReadWriteIOUtils.readLong(byteBuffer);
+
+ int size = byteBuffer.getInt();
+ for (int i = 0; i < size; ++i) {
+ pipeStaticMeta.collectorAttributes.put(
+ ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+ }
+ size = byteBuffer.getInt();
+ for (int i = 0; i < size; ++i) {
+ pipeStaticMeta.processorAttributes.put(
+ ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+ }
+ size = byteBuffer.getInt();
+ for (int i = 0; i < size; ++i) {
+ pipeStaticMeta.connectorAttributes.put(
+ ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+ }
+
+ return pipeStaticMeta;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeStaticMeta that = (PipeStaticMeta) obj;
+ return pipeName.equals(that.pipeName)
+ && createTime == that.createTime
+ && collectorAttributes.equals(that.collectorAttributes)
+ && processorAttributes.equals(that.processorAttributes)
+ && connectorAttributes.equals(that.connectorAttributes);
+ }
+
+ @Override
+ public int hashCode() {
+ return pipeName.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "PipeStaticMeta{"
+ + "pipeName='"
+ + pipeName
+ + '\''
+ + ", createTime="
+ + createTime
+ + ", collectorAttributes="
+ + collectorAttributes
+ + ", processorAttributes="
+ + processorAttributes
+ + ", connectorAttributes="
+ + connectorAttributes
+ + '}';
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStatus.java
similarity index 61%
rename from node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStatus.java
index 04653122c2..74f6c3403f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStatus.java
@@ -19,4 +19,32 @@
package org.apache.iotdb.commons.pipe.task.meta;
-public class PipeTaskMetaAccessor {}
+public enum PipeStatus {
+ RUNNING((byte) 0),
+ STOPPED((byte) 1),
+ DROPPED((byte) 2),
+ ;
+
+ private final byte type;
+
+ PipeStatus(byte type) {
+ this.type = type;
+ }
+
+ public byte getType() {
+ return type;
+ }
+
+ public static PipeStatus getPipeStatus(byte type) {
+ switch (type) {
+ case 0:
+ return PipeStatus.RUNNING;
+ case 1:
+ return PipeStatus.STOPPED;
+ case 2:
+ return PipeStatus.DROPPED;
+ default:
+ throw new IllegalArgumentException("Invalid input: " + type);
+ }
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
index ec935e4c94..5b5fcf5e9a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.commons.sync.pipe;
public enum SyncOperation {
- // PIPESINK
CREATE_PIPESINK,
DROP_PIPESINK,
// PIPE
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
index 3be5d25f10..d31ea9b98a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
@@ -93,7 +93,7 @@ public class InsertRowStatement extends InsertBaseStatement {
this.values = new Object[measurements.length];
this.dataTypes = new TSDataType[measurements.length];
for (int i = 0; i < dataTypes.length; i++) {
- // types are not determined, the situation mainly occurs when the plan uses string values
+ // Types are not determined, the situation mainly occurs when the plan uses string values
// and is forwarded to other nodes
byte typeNum = (byte) ReadWriteIOUtils.read(buffer);
if (typeNum == TYPE_RAW_STRING || typeNum == TYPE_NULL) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 56b172bfa0..34db75c702 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -38,14 +38,12 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
-import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.SyncOperation;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.udf.UDFInformation;
@@ -113,7 +111,6 @@ import org.apache.iotdb.db.quotas.DataNodeSpaceQuotaManager;
import org.apache.iotdb.db.quotas.DataNodeThrottleQuotaManager;
import org.apache.iotdb.db.service.DataNode;
import org.apache.iotdb.db.service.RegionMigrateService;
-import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
import org.apache.iotdb.db.trigger.executor.TriggerFireResult;
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
@@ -744,13 +741,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
@Override
public TSStatus createPipeOnDataNode(TCreatePipeOnDataNodeReq req) {
- try {
- PipeInfo pipeInfo = PipeInfo.deserializePipeInfo(req.pipeInfo);
- SyncService.getInstance().addPipe(pipeInfo);
- return RpcUtils.SUCCESS_STATUS;
- } catch (PipeException e) {
- return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
- }
+ throw new NotImplementedException("TODO: createPipeOnDataNode");
}
@Override
@@ -758,44 +749,18 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
try {
switch (SyncOperation.values()[req.getOperation()]) {
case START_PIPE:
- SyncService.getInstance().startPipe(req.getPipeName());
- break;
case STOP_PIPE:
- SyncService.getInstance().stopPipe(req.getPipeName());
- break;
case DROP_PIPE:
- SyncService.getInstance().dropPipe(req.getPipeName());
- break;
+ throw new NotImplementedException("TODO: operatePipeOnDataNode");
default:
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage("Unsupported operation.");
}
- return RpcUtils.SUCCESS_STATUS;
- } catch (PipeException e) {
+ } catch (Exception e) {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
}
}
- @Override
- public TSStatus operatePipeOnDataNodeForRollback(TOperatePipeOnDataNodeReq req) {
- // Operate PIPE on DataNode for rollback, createTime in req is required.
- switch (SyncOperation.values()[req.getOperation()]) {
- case START_PIPE:
- SyncService.getInstance().startPipe(req.getPipeName(), req.getCreateTime());
- break;
- case STOP_PIPE:
- SyncService.getInstance().stopPipe(req.getPipeName(), req.getCreateTime());
- break;
- case DROP_PIPE:
- SyncService.getInstance().dropPipe(req.getPipeName(), req.getCreateTime());
- break;
- default:
- return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
- .setMessage("Unsupported operation.");
- }
- return RpcUtils.SUCCESS_STATUS;
- }
-
@Override
public TSStatus executeCQ(TExecuteCQ req) {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index c9558af83d..3b8d18de89 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -356,14 +356,13 @@ struct TCountPathsUsingTemplateResp{
}
struct TCreatePipeOnDataNodeReq{
- 1: required binary pipeInfo
+ 1: required binary pipeMeta
}
struct TOperatePipeOnDataNodeReq {
1: required string pipeName
// ordinal of {@linkplain SyncOperation}
2: required i8 operation
- 3: optional i64 createTime
}
// ====================================================
@@ -751,11 +750,6 @@ service IDataNodeRPCService {
*/
common.TSStatus operatePipeOnDataNode(TOperatePipeOnDataNodeReq req)
- /**
- * Start, stop or drop PIPE on DataNode for rollback
- */
- common.TSStatus operatePipeOnDataNodeForRollback(TOperatePipeOnDataNodeReq req)
-
/**
* Execute CQ on DataNode
*/