You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/10/11 07:25:01 UTC
[iotdb] branch master updated: [IOTDB-4528] Operate Pipe Procedure (#7533)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e9c2871b27 [IOTDB-4528] Operate Pipe Procedure (#7533)
e9c2871b27 is described below
commit e9c2871b273e77df1758dde1389ba490befca4c5
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Tue Oct 11 15:24:55 2022 +0800
[IOTDB-4528] Operate Pipe Procedure (#7533)
---
.../confignode/client/DataNodeRequestType.java | 4 +
.../client/async/AsyncDataNodeClientPool.java | 14 ++
.../client/async/handlers/AsyncClientHandler.java | 2 +
.../consensus/request/ConfigPhysicalPlan.java | 12 ++
.../consensus/request/ConfigPhysicalPlanType.java | 3 +
.../request/write/sync/GetPipeSinkPlan.java | 1 +
...GetPipeSinkPlan.java => PreCreatePipePlan.java} | 25 +--
...GetPipeSinkPlan.java => SetPipeStatusPlan.java} | 41 +++--
.../{GetPipeSinkPlan.java => ShowPipePlan.java} | 23 +--
.../confignode/consensus/response/PipeResp.java | 37 ++++-
.../iotdb/confignode/manager/ConfigManager.java | 59 +++++++
.../apache/iotdb/confignode/manager/IManager.java | 50 ++++++
.../iotdb/confignode/manager/ProcedureManager.java | 74 +++++++++
.../iotdb/confignode/manager/SyncManager.java | 98 ++++++++++-
.../persistence/executor/ConfigPlanExecutor.java | 9 +
.../persistence/sync/ClusterSyncInfo.java | 79 ++++++++-
.../procedure/env/ConfigNodeProcedureEnv.java | 7 +
.../impl/sync/AbstractOperatePipeProcedure.java | 142 ++++++++++++++++
.../procedure/impl/sync/CreatePipeProcedure.java | 146 ++++++++++++++++
.../procedure/impl/sync/DropPipeProcedure.java | 133 +++++++++++++++
.../procedure/impl/sync/StartPipeProcedure.java | 132 +++++++++++++++
.../procedure/impl/sync/StopPipeProcedure.java | 132 +++++++++++++++
.../procedure/state/sync/OperatePipeState.java | 12 +-
.../procedure/store/ProcedureFactory.java | 30 +++-
.../thrift/ConfigNodeRPCServiceProcessor.java | 28 ++++
.../request/ConfigPhysicalPlanSerDeTest.java | 42 +++++
.../procedure/impl/OperatePipeProcedureTest.java | 122 ++++++++++++++
.../org/apache/iotdb/db/it/sync/IoTDBPipeIT.java | 3 +-
.../sync/PipeNotExistException.java} | 17 +-
.../iotdb/commons/sync/metadata/SyncMetadata.java | 38 ++---
.../commons/sync/persistence/SyncLogReader.java | 7 +-
.../apache/iotdb/commons/sync/pipe/PipeInfo.java | 58 +++++--
.../apache/iotdb/commons/sync/pipe/PipeStatus.java | 7 +-
.../iotdb/commons/sync/pipe/SyncOperation.java | 2 +
.../iotdb/commons/sync/pipe/TsFilePipeInfo.java | 15 ++
.../iotdb/commons/sync/pipesink/IoTDBPipeSink.java | 8 +
.../iotdb/commons/sync/pipesink/PipeSink.java | 20 +++
.../commons/utils/ThriftConfigNodeSerDeUtils.java | 23 ++-
.../apache/iotdb/db/client/ConfigNodeClient.java | 83 ++++++++++
.../iotdb/db/localconfignode/LocalConfigNode.java | 11 +-
.../config/executor/ClusterConfigTaskExecutor.java | 107 +++++++++---
.../executor/StandaloneConfigTaskExecutor.java | 12 +-
.../config/sys/sync/ShowPipeSinkTask.java | 2 +-
.../execution/config/sys/sync/ShowPipeTask.java | 6 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 4 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 40 +++++
.../java/org/apache/iotdb/db/sync/SyncService.java | 184 +++++++++++----------
.../db/sync/common/ClusterSyncInfoFetcher.java | 144 ++++++++++++++++
.../iotdb/db/sync/common/ISyncInfoFetcher.java | 10 +-
.../apache/iotdb/db/sync/common/LocalSyncInfo.java | 41 ++---
.../iotdb/db/sync/common/LocalSyncInfoFetcher.java | 16 +-
.../db/sync/sender/pipe/ExternalPipeSink.java | 8 +
.../apache/iotdb/db/utils/sync/SyncPipeUtil.java | 74 ++++-----
.../sync/receiver/manager/LocalSyncInfoTest.java | 10 +-
.../src/main/thrift/confignode.thrift | 30 +++-
thrift/src/main/thrift/datanode.thrift | 20 +++
56 files changed, 2132 insertions(+), 325 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index e3ac3c249e..e2e52f3613 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -60,6 +60,10 @@ public enum DataNodeRequestType {
ACTIVE_TRIGGER_INSTANCE,
INACTIVE_TRIGGER_INSTANCE,
+ /** Sync */
+ PRE_CREATE_PIPE,
+ OPERATE_PIPE,
+
/** TEMPLATE */
UPDATE_TEMPLATE,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 98fe826e93..24865fddbe 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
+import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
@@ -43,6 +44,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
@@ -244,6 +246,18 @@ public class AsyncDataNodeClientPool {
(DeleteTimeSeriesRPCHandler)
clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
break;
+ case PRE_CREATE_PIPE:
+ client.createPipeOnDataNode(
+ (TCreatePipeOnDataNodeReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case OPERATE_PIPE:
+ client.operatePipeOnDataNode(
+ (TOperatePipeOnDataNodeReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
default:
LOGGER.error(
"Unexpected DataNode Request Type: {} when sendAsyncRequestToDataNode",
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index 60c7878fe4..a10ef2306c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -188,6 +188,8 @@ public class AsyncClientHandler<Q, R> {
case UPDATE_REGION_ROUTE_MAP:
case BROADCAST_LATEST_CONFIG_NODE_GROUP:
case INVALIDATE_MATCHED_SCHEMA_CACHE:
+ case PRE_CREATE_PIPE:
+ case OPERATE_PIPE:
default:
return new AsyncTSStatusRPCHandler(
requestType,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 46b72279bf..62d1bab979 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -63,6 +63,9 @@ import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimeP
import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
@@ -274,6 +277,15 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
case GetPipeSink:
req = new GetPipeSinkPlan();
break;
+ case PreCreatePipe:
+ req = new PreCreatePipePlan();
+ break;
+ case SetPipeStatus:
+ req = new SetPipeStatusPlan();
+ break;
+ case ShowPipe:
+ req = new ShowPipePlan();
+ break;
case GetRouting:
req = new GetRoutingPlan();
break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index 6f467b5c01..ca96142d45 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -81,6 +81,9 @@ public enum ConfigPhysicalPlanType {
CreatePipeSink,
DropPipeSink,
GetPipeSink,
+ PreCreatePipe,
+ SetPipeStatus,
+ ShowPipe,
AddTriggerInTable,
DeleteTriggerInTable,
GetTriggerTable,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
index 688578ed7e..9cfcecfc65 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
public class GetPipeSinkPlan extends ConfigPhysicalPlan {
+ /** empty pipeSinkName means get all PIPESINK */
private String pipeSinkName;
public GetPipeSinkPlan() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlan.java
similarity index 70%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlan.java
index 688578ed7e..016428a083 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlan.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.confignode.consensus.request.write.sync;
-import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
@@ -26,30 +26,31 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class GetPipeSinkPlan extends ConfigPhysicalPlan {
- private String pipeSinkName;
+public class PreCreatePipePlan extends ConfigPhysicalPlan {
- public GetPipeSinkPlan() {
- super(ConfigPhysicalPlanType.GetPipeSink);
+ private PipeInfo pipeInfo;
+
+ public PreCreatePipePlan() {
+ super(ConfigPhysicalPlanType.PreCreatePipe);
}
- public GetPipeSinkPlan(String pipeSinkName) {
+ public PreCreatePipePlan(PipeInfo pipeInfo) {
this();
- this.pipeSinkName = pipeSinkName;
+ this.pipeInfo = pipeInfo;
}
- public String getPipeSinkName() {
- return pipeSinkName;
+ public PipeInfo getPipeInfo() {
+ return pipeInfo;
}
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
- stream.writeInt(ConfigPhysicalPlanType.GetPipeSink.ordinal());
- BasicStructureSerDeUtil.write(pipeSinkName, stream);
+ stream.writeInt(ConfigPhysicalPlanType.PreCreatePipe.ordinal());
+ pipeInfo.serialize(stream);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
- pipeSinkName = BasicStructureSerDeUtil.readString(buffer);
+ pipeInfo = PipeInfo.deserializePipeInfo(buffer);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java
similarity index 56%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java
index 688578ed7e..13b0b6dfb1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java
@@ -18,38 +18,55 @@
*/
package org.apache.iotdb.confignode.consensus.request.write.sync;
-import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class GetPipeSinkPlan extends ConfigPhysicalPlan {
- private String pipeSinkName;
+public class SetPipeStatusPlan extends ConfigPhysicalPlan {
+ private String pipeName;
+ private PipeStatus pipeStatus;
- public GetPipeSinkPlan() {
- super(ConfigPhysicalPlanType.GetPipeSink);
+ public SetPipeStatusPlan() {
+ super(ConfigPhysicalPlanType.SetPipeStatus);
}
- public GetPipeSinkPlan(String pipeSinkName) {
+ public SetPipeStatusPlan(String pipeName, PipeStatus pipeStatus) {
this();
- this.pipeSinkName = pipeSinkName;
+ this.pipeName = pipeName;
+ this.pipeStatus = pipeStatus;
}
- public String getPipeSinkName() {
- return pipeSinkName;
+ public String getPipeName() {
+ return pipeName;
+ }
+
+ public void setPipeName(String pipeName) {
+ this.pipeName = pipeName;
+ }
+
+ public PipeStatus getPipeStatus() {
+ return pipeStatus;
+ }
+
+ public void setPipeStatus(PipeStatus pipeStatus) {
+ this.pipeStatus = pipeStatus;
}
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
- stream.writeInt(ConfigPhysicalPlanType.GetPipeSink.ordinal());
- BasicStructureSerDeUtil.write(pipeSinkName, stream);
+ stream.writeInt(ConfigPhysicalPlanType.SetPipeStatus.ordinal());
+ ReadWriteIOUtils.write(pipeName, stream);
+ ReadWriteIOUtils.write((byte) pipeStatus.ordinal(), stream);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
- pipeSinkName = BasicStructureSerDeUtil.readString(buffer);
+ pipeName = ReadWriteIOUtils.readString(buffer);
+ pipeStatus = PipeStatus.values()[ReadWriteIOUtils.readByte(buffer)];
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlan.java
similarity index 73%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlan.java
index 688578ed7e..36c6fc3083 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlan.java
@@ -26,30 +26,31 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class GetPipeSinkPlan extends ConfigPhysicalPlan {
- private String pipeSinkName;
+public class ShowPipePlan extends ConfigPhysicalPlan {
+ /** empty pipeName means show all PIPE */
+ private String pipeName;
- public GetPipeSinkPlan() {
- super(ConfigPhysicalPlanType.GetPipeSink);
+ public ShowPipePlan() {
+ super(ConfigPhysicalPlanType.ShowPipe);
}
- public GetPipeSinkPlan(String pipeSinkName) {
+ public ShowPipePlan(String pipeName) {
this();
- this.pipeSinkName = pipeSinkName;
+ this.pipeName = pipeName;
}
- public String getPipeSinkName() {
- return pipeSinkName;
+ public String getPipeName() {
+ return pipeName;
}
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
- stream.writeInt(ConfigPhysicalPlanType.GetPipeSink.ordinal());
- BasicStructureSerDeUtil.write(pipeSinkName, stream);
+ stream.writeInt(ConfigPhysicalPlanType.ShowPipe.ordinal());
+ BasicStructureSerDeUtil.write(pipeName, stream);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
- pipeSinkName = BasicStructureSerDeUtil.readString(buffer);
+ pipeName = BasicStructureSerDeUtil.readString(buffer);
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/PipeResp.java
similarity index 56%
copy from node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/PipeResp.java
index d2bfdf743b..11a5c618d0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/PipeResp.java
@@ -16,13 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.commons.sync.pipe;
-
-public enum SyncOperation {
- CREATE_PIPESINK,
- DROP_PIPESINK,
- CREATE_PIPE,
- START_PIPE,
- STOP_PIPE,
- DROP_PIPE
+package org.apache.iotdb.confignode.consensus.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.consensus.common.DataSet;
+
+import java.util.List;
+
+public class PipeResp implements DataSet {
+
+ TSStatus status;
+ List<PipeInfo> pipeInfoList;
+
+ public TSStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(TSStatus status) {
+ this.status = status;
+ }
+
+ public List<PipeInfo> getPipeInfoList() {
+ return pipeInfoList;
+ }
+
+ public void setPipeInfoList(List<PipeInfo> pipeInfoList) {
+ this.pipeInfoList = pipeInfoList;
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 2e4729cbc0..c1bde6dbc2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -102,6 +102,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -110,6 +111,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.consensus.common.DataSet;
@@ -609,6 +612,11 @@ public class ConfigManager implements IManager {
return triggerManager;
}
+ @Override
+ public SyncManager getSyncManager() {
+ return syncManager;
+ }
+
@Override
public TSStatus operatePermission(AuthorPlan authorPlan) {
TSStatus status = confirmLeader();
@@ -1010,6 +1018,57 @@ public class ConfigManager implements IManager {
}
}
+ @Override
+ public TSStatus createPipe(TPipeInfo pipeInfo) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return procedureManager.createPipe(pipeInfo);
+ } else {
+ return status;
+ }
+ }
+
+ @Override
+ public TSStatus startPipe(String pipeName) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return procedureManager.startPipe(pipeName);
+ } else {
+ return status;
+ }
+ }
+
+ @Override
+ public TSStatus stopPipe(String pipeName) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return procedureManager.stopPipe(pipeName);
+ } else {
+ return status;
+ }
+ }
+
+ @Override
+ public TSStatus dropPipe(String pipeName) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return procedureManager.dropPipe(pipeName);
+ } else {
+ return status;
+ }
+ }
+
+ @Override
+ public TShowPipeResp showPipe(TShowPipeReq req) {
+ TSStatus status = confirmLeader();
+ TShowPipeResp resp = new TShowPipeResp();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return syncManager.showPipe(req.getPipeName());
+ } else {
+ return resp.setStatus(status);
+ }
+ }
+
@Override
@TestOnly
public TGetRoutingResp getRouting(GetRoutingPlan plan) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index d028f3425c..514cc03ba7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -70,6 +71,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
import org.apache.iotdb.consensus.common.DataSet;
@@ -130,6 +133,13 @@ public interface IManager {
*/
TriggerManager getTriggerManager();
+ /**
+ * Get SyncManager
+ *
+ * @return SyncManager instance
+ */
+ SyncManager getSyncManager();
+
/**
* Get ProcedureManager
*
@@ -406,6 +416,46 @@ public interface IManager {
*/
TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req);
+ /**
+ * Create Pipe
+ *
+ * @param pipeInfo Info about Pipe
+ * @return TSStatus
+ */
+ TSStatus createPipe(TPipeInfo pipeInfo);
+
+ /**
+ * Start Pipe
+ *
+ * @param pipeName name of Pipe
+ * @return TSStatus
+ */
+ TSStatus startPipe(String pipeName);
+
+ /**
+ * Stop Pipe
+ *
+ * @param pipeName name of Pipe
+ * @return TSStatus
+ */
+ TSStatus stopPipe(String pipeName);
+
+ /**
+ * Drop Pipe
+ *
+ * @param pipeName name of Pipe
+ * @return TSStatus
+ */
+ TSStatus dropPipe(String pipeName);
+
+ /**
+ * Get Pipe by name. If pipeName is empty, get all Pipe.
+ *
+ * @param req specify the pipeName
+ * @return TShowPipeResp contains the TShowPipeInfo
+ */
+ TShowPipeResp showPipe(TShowPipeReq req);
+
TGetRoutingResp getRouting(GetRoutingPlan plan);
TGetTimeSlotListResp getTimeSlotList(GetTimeSlotListPlan plan);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index e57b16e56f..ef583c341e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.utils.StatusUtils;
@@ -44,6 +45,10 @@ import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroup
import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteStorageGroupProcedure;
import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteTimeSeriesProcedure;
import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.CreatePipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.DropPipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.StartPipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.StopPipeProcedure;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
import org.apache.iotdb.confignode.procedure.store.ConfigProcedureStore;
@@ -52,6 +57,7 @@ import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.confignode.procedure.store.ProcedureStore;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.rpc.RpcUtils;
@@ -266,6 +272,74 @@ public class ProcedureManager {
}
}
+ public TSStatus createPipe(TPipeInfo req) {
+ try {
+ long procedureId = executor.submitProcedure(new CreatePipeProcedure(req));
+ List<TSStatus> statusList = new ArrayList<>();
+ boolean isSucceed =
+ waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+ if (isSucceed) {
+ return RpcUtils.SUCCESS_STATUS;
+ } else {
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+ .setMessage(statusList.get(0).getMessage());
+ }
+ } catch (PipeException e) {
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ }
+ }
+
+ public TSStatus startPipe(String pipeName) {
+ try {
+ long procedureId = executor.submitProcedure(new StartPipeProcedure(pipeName));
+ List<TSStatus> statusList = new ArrayList<>();
+ boolean isSucceed =
+ waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+ if (isSucceed) {
+ return RpcUtils.SUCCESS_STATUS;
+ } else {
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+ .setMessage(statusList.get(0).getMessage());
+ }
+ } catch (PipeException e) {
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ }
+ }
+
+ public TSStatus stopPipe(String pipeName) {
+ try {
+ long procedureId = executor.submitProcedure(new StopPipeProcedure(pipeName));
+ List<TSStatus> statusList = new ArrayList<>();
+ boolean isSucceed =
+ waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+ if (isSucceed) {
+ return RpcUtils.SUCCESS_STATUS;
+ } else {
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+ .setMessage(statusList.get(0).getMessage());
+ }
+ } catch (PipeException e) {
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ }
+ }
+
+ public TSStatus dropPipe(String pipeName) {
+ try {
+ long procedureId = executor.submitProcedure(new DropPipeProcedure(pipeName));
+ List<TSStatus> statusList = new ArrayList<>();
+ boolean isSucceed =
+ waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+ if (isSucceed) {
+ return RpcUtils.SUCCESS_STATUS;
+ } else {
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+ .setMessage(statusList.get(0).getMessage());
+ }
+ } catch (PipeException e) {
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ }
+ }
+
/**
* Waiting until the specific procedures finished
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
index 10af4dac17..c8af8831f8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
@@ -18,20 +18,40 @@
*/
package org.apache.iotdb.confignode.manager;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.exception.sync.PipeSinkException;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
+import org.apache.iotdb.confignode.consensus.response.PipeResp;
import org.apache.iotdb.confignode.consensus.response.PipeSinkResp;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
public class SyncManager {
@@ -88,10 +108,84 @@ public class SyncManager {
// endregion
// ======================================================
- // region Implement of PipeS
+ // region Implement of Pipe
// ======================================================
- // TODO....
+ public void checkAddPipe(PipeInfo pipeInfo) throws PipeException {
+ clusterSyncInfo.checkAddPipe(pipeInfo);
+ }
+
+ public TSStatus preCreatePipe(PipeInfo pipeInfo) {
+ pipeInfo.setStatus(PipeStatus.PREPARE_CREATE);
+ return getConsensusManager().write(new PreCreatePipePlan(pipeInfo)).getStatus();
+ }
+
+ public TSStatus setPipeStatus(String pipeName, PipeStatus pipeStatus) {
+ return getConsensusManager().write(new SetPipeStatusPlan(pipeName, pipeStatus)).getStatus();
+ }
+
+ public TShowPipeResp showPipe(String pipeName) {
+ ShowPipePlan showPipePlan = new ShowPipePlan(pipeName);
+ PipeResp pipeResp = (PipeResp) getConsensusManager().read(showPipePlan).getDataset();
+ TShowPipeResp resp = new TShowPipeResp();
+ resp.setStatus(pipeResp.getStatus());
+ if (pipeResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ resp.setPipeInfoList(
+ pipeResp.getPipeInfoList().stream()
+ .map(PipeInfo::getTShowPipeInfo)
+ .collect(Collectors.toList()));
+ }
+ return resp;
+ }
+
+ public PipeInfo getPipeInfo(String pipeName) throws PipeException {
+ return clusterSyncInfo.getPipeInfo(pipeName);
+ }
+
+ /**
+ * Broadcast DataNodes to operate PIPE operation.
+ *
+ * @param pipeName name of PIPE
+ * @param operation only support {@link SyncOperation#START_PIPE}, {@link SyncOperation#STOP_PIPE}
+ * and {@link SyncOperation#DROP_PIPE}
+ * @return list of TSStatus
+ */
+ public List<TSStatus> operatePipeOnDataNodes(String pipeName, SyncOperation operation) {
+ NodeManager nodeManager = configManager.getNodeManager();
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ nodeManager.getRegisteredDataNodeLocations();
+ final List<TSStatus> dataNodeResponseStatus =
+ Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
+ final TOperatePipeOnDataNodeReq request =
+ new TOperatePipeOnDataNodeReq(pipeName, (byte) operation.ordinal());
+
+ AsyncClientHandler<TOperatePipeOnDataNodeReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.OPERATE_PIPE, request, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+
+ return dataNodeResponseStatus;
+ }
+
+ /**
+ * Broadcast DataNodes to pre create PIPE
+ *
+ * @param pipeInfo pipeInfo
+ * @return list of TSStatus
+ */
+ public List<TSStatus> preCreatePipeOnDataNodes(PipeInfo pipeInfo) {
+ NodeManager nodeManager = configManager.getNodeManager();
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ nodeManager.getRegisteredDataNodeLocations();
+ final List<TSStatus> dataNodeResponseStatus =
+ Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
+ final TCreatePipeOnDataNodeReq request =
+ new TCreatePipeOnDataNodeReq(pipeInfo.serializeToByteBuffer());
+
+ AsyncClientHandler<TCreatePipeOnDataNodeReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.PRE_CREATE_PIPE, request, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return dataNodeResponseStatus;
+ }
// endregion
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 64b488cdca..983aafcf79 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -62,6 +62,9 @@ import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimeP
import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
@@ -173,6 +176,8 @@ public class ConfigPlanExecutor {
return clusterSchemaInfo.getAllTemplateSetInfo();
case GetPipeSink:
return syncInfo.getPipeSink((GetPipeSinkPlan) req);
+ case ShowPipe:
+ return syncInfo.showPipe((ShowPipePlan) req);
case GetTriggerTable:
return triggerInfo.getTriggerTable();
case GetRouting:
@@ -268,6 +273,10 @@ public class ConfigPlanExecutor {
return syncInfo.addPipeSink((CreatePipeSinkPlan) physicalPlan);
case DropPipeSink:
return syncInfo.dropPipeSink((DropPipeSinkPlan) physicalPlan);
+ case PreCreatePipe:
+ return syncInfo.preCreatePipe((PreCreatePipePlan) physicalPlan);
+ case SetPipeStatus:
+ return syncInfo.operatePipe((SetPipeStatusPlan) physicalPlan);
default:
throw new UnknownPhysicalPlanTypeException(physicalPlan.getType());
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
index c42dd0d75f..1cda416fe8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
@@ -19,12 +19,19 @@
package org.apache.iotdb.confignode.persistence.sync;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.exception.sync.PipeNotExistException;
import org.apache.iotdb.commons.exception.sync.PipeSinkException;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.commons.sync.metadata.SyncMetadata;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
+import org.apache.iotdb.confignode.consensus.response.PipeResp;
import org.apache.iotdb.confignode.consensus.response.PipeSinkResp;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
import org.apache.iotdb.rpc.RpcUtils;
@@ -37,7 +44,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
public class ClusterSyncInfo implements SnapshotProcessor {
@@ -65,9 +74,10 @@ public class ClusterSyncInfo implements SnapshotProcessor {
public TSStatus addPipeSink(CreatePipeSinkPlan plan) {
TSStatus status = new TSStatus();
try {
- syncMetadata.addPipeSink(SyncPipeUtil.parsePipeInfoAsPipe(plan.getPipeSinkInfo()));
+ syncMetadata.addPipeSink(SyncPipeUtil.parseTPipeSinkInfoAsPipeSink(plan.getPipeSinkInfo()));
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (PipeSinkException e) {
+ LOGGER.error("failed to execute CreatePipeSinkPlan {} on ClusterSyncInfo", plan, e);
status.setCode(TSStatusCode.PIPESINK_ERROR.getStatusCode());
LOGGER.error(e.getMessage());
}
@@ -103,6 +113,73 @@ public class ClusterSyncInfo implements SnapshotProcessor {
// endregion
+ // ======================================================
+ // region Implement of Pipe
+ // ======================================================
+
+ /**
+ * Check Pipe before create operation
+ *
+ * @param pipeInfo pipe info
+ * @throws PipeException if there is Pipe with the same name exists or PipeSink does not exist
+ */
+ public void checkAddPipe(PipeInfo pipeInfo) throws PipeException {
+ syncMetadata.checkAddPipe(pipeInfo);
+ }
+
+ public TSStatus preCreatePipe(PreCreatePipePlan physicalPlan) {
+ syncMetadata.addPipe(physicalPlan.getPipeInfo());
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ public TSStatus operatePipe(SetPipeStatusPlan physicalPlan) {
+ TSStatus status = new TSStatus();
+ try {
+ syncMetadata.setPipeStatus(physicalPlan.getPipeName(), physicalPlan.getPipeStatus());
+ status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (PipeException e) {
+ LOGGER.error("failed to execute OperatePipePlan {} on ClusterSyncInfo", physicalPlan, e);
+ status.setCode(TSStatusCode.PIPE_ERROR.getStatusCode());
+ LOGGER.error(e.getMessage());
+ }
+ return status;
+ }
+
+ public PipeResp showPipe(ShowPipePlan plan) {
+ PipeResp resp = new PipeResp();
+ // TODO(sync): optimize logic later
+ List<PipeInfo> allPipeInfos = syncMetadata.getAllPipeInfos();
+ if (StringUtils.isEmpty(plan.getPipeName())) {
+ resp.setPipeInfoList(allPipeInfos);
+ } else {
+ List<PipeInfo> pipeInfoList = new ArrayList<>();
+ for (PipeInfo pipeInfo : allPipeInfos) {
+ if (plan.getPipeName().equals(pipeInfo.getPipeName())) {
+ pipeInfoList.add(pipeInfo);
+ }
+ }
+ resp.setPipeInfoList(pipeInfoList);
+ }
+ resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ return resp;
+ }
+
+ /**
+ * Get PipeInfo by pipeName. Check before start, stop and drop operation
+ *
+ * @param pipeName pipe name
+ * @throws PipeNotExistException if there is Pipe does not exist
+ */
+ public PipeInfo getPipeInfo(String pipeName) throws PipeNotExistException {
+ PipeInfo pipeInfo = syncMetadata.getRunningPipeInfo();
+ if (pipeInfo == null || !pipeInfo.getPipeName().equals(pipeName)) {
+ throw new PipeNotExistException(pipeName);
+ }
+ return pipeInfo;
+ }
+
+ // endregion
+
// ======================================================
// region Implement of Snapshot
// ======================================================
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 6862be0a4d..44d6549f96 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -83,6 +83,9 @@ public class ConfigNodeProcedureEnv {
/** add or remove node lock */
private final LockQueue nodeLock = new LockQueue();
+ /** pipe operation lock */
+ private final LockQueue pipeLock = new LockQueue();
+
private final ReentrantLock schedulerLock = new ReentrantLock();
private final ConfigManager configManager;
@@ -551,6 +554,10 @@ public class ConfigNodeProcedureEnv {
return nodeLock;
}
+ public LockQueue getPipeLock() {
+ return pipeLock;
+ }
+
public ProcedureScheduler getScheduler() {
return scheduler;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/AbstractOperatePipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/AbstractOperatePipeProcedure.java
new file mode 100644
index 0000000000..81b576419d
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/AbstractOperatePipeProcedure.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.sync;
+
+import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProcedure;
+import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
+import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This procedure manage three kinds of PIPE operations: CREATE, START and STOP */
+abstract class AbstractOperatePipeProcedure
+ extends StateMachineProcedure<ConfigNodeProcedureEnv, OperatePipeState> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractOperatePipeProcedure.class);
+
+ private static final int retryThreshold = 3;
+
+ /**
+ * Execute at state OPERATE_CHECK
+ *
+ * @return true if procedure can finish directly
+ */
+ abstract boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException;
+
+ /** Execute at state PRE_OPERATE_PIPE_CONFIGNODE */
+ abstract void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException;
+
+ /** Execute at state OPERATE_PIPE_DATANODE */
+ abstract void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException;
+
+ /** Execute at state OPERATE_PIPE_CONFIGNODE */
+ abstract void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException;
+
+ abstract SyncOperation getOperation();
+
+ @Override
+ protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeState state)
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ try {
+ switch (state) {
+ case OPERATE_CHECK:
+ if (executeCheckCanSkip(env)) {
+ return Flow.NO_MORE_STATE;
+ }
+ setNextState(OperatePipeState.PRE_OPERATE_PIPE_CONFIGNODE);
+ break;
+ case PRE_OPERATE_PIPE_CONFIGNODE:
+ executePreOperatePipeOnConfigNode(env);
+ setNextState(OperatePipeState.OPERATE_PIPE_DATANODE);
+ break;
+ case OPERATE_PIPE_DATANODE:
+ executeOperatePipeOnDataNode(env);
+ setNextState(OperatePipeState.OPERATE_PIPE_CONFIGNODE);
+ break;
+ case OPERATE_PIPE_CONFIGNODE:
+ executeOperatePipeOnConfigNode(env);
+ return Flow.NO_MORE_STATE;
+ }
+ } catch (PipeException e) {
+ if (isRollbackSupported(state)) {
+ LOGGER.error("Fail in OperatePipeProcedure", e);
+ setFailure(new ProcedureException(e.getMessage()));
+ } else {
+ LOGGER.error("Retrievable error trying to {} at state [{}]", getOperation(), state, e);
+ if (getCycles() > retryThreshold) {
+ setFailure(
+ new ProcedureException(
+ String.format("Fail to %s because %s", getOperation().name(), e.getMessage())));
+ }
+ }
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv env) {
+ env.getSchedulerLock().lock();
+ try {
+ if (env.getPipeLock().tryLock(this)) {
+ LOGGER.info("procedureId {} acquire lock.", getProcId());
+ return ProcedureLockState.LOCK_ACQUIRED;
+ }
+ env.getPipeLock().waitProcedure(this);
+ LOGGER.info("procedureId {} wait for lock.", getProcId());
+ return ProcedureLockState.LOCK_EVENT_WAIT;
+ } finally {
+ env.getSchedulerLock().unlock();
+ }
+ }
+
+ @Override
+ protected void releaseLock(ConfigNodeProcedureEnv env) {
+ env.getSchedulerLock().lock();
+ try {
+ LOGGER.info("procedureId {} release lock.", getProcId());
+ if (env.getPipeLock().releaseLock(this)) {
+ env.getPipeLock().wakeWaitingProcedures(env.getScheduler());
+ }
+ } finally {
+ env.getSchedulerLock().unlock();
+ }
+ }
+
+ @Override
+ protected OperatePipeState getState(int stateId) {
+ return OperatePipeState.values()[stateId];
+ }
+
+ @Override
+ protected int getStateId(OperatePipeState state) {
+ return state.ordinal();
+ }
+
+ @Override
+ protected OperatePipeState getInitialState() {
+ return OperatePipeState.OPERATE_CHECK;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
new file mode 100644
index 0000000000..25862974ac
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.sync;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class CreatePipeProcedure extends AbstractOperatePipeProcedure {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CreatePipeProcedure.class);
+
+ private PipeInfo pipeInfo;
+
+ public CreatePipeProcedure() {
+ super();
+ }
+
+ public CreatePipeProcedure(TPipeInfo pipeInfo) throws PipeException {
+ super();
+ this.pipeInfo = SyncPipeUtil.parseTPipeInfoAsPipeInfo(pipeInfo, System.currentTimeMillis());
+ }
+
+ @Override
+ boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException {
+ LOGGER.info("Start to create PIPE [{}]", pipeInfo.getPipeName());
+ env.getConfigManager().getSyncManager().checkAddPipe(pipeInfo);
+ return false;
+ }
+
+ @Override
+ void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
+ LOGGER.info("Start to pre-create PIPE [{}] on Config Nodes", pipeInfo.getPipeName());
+ TSStatus status = env.getConfigManager().getSyncManager().preCreatePipe(pipeInfo);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(status.getMessage());
+ }
+ }
+
+ @Override
+ void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException {
+ LOGGER.info("Start to broadcast create PIPE [{}] on Data Nodes", pipeInfo.getPipeName());
+ TSStatus status =
+ RpcUtils.squashResponseStatusList(
+ env.getConfigManager().getSyncManager().preCreatePipeOnDataNodes(pipeInfo));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format(
+ "Fail to create PIPE [%s] on Data Nodes because %s",
+ pipeInfo.getPipeName(), status.getMessage()));
+ }
+ }
+
+ @Override
+ void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
+ LOGGER.info("Start to create PIPE [{}] on Config Nodes", pipeInfo.getPipeName());
+ TSStatus status =
+ env.getConfigManager()
+ .getSyncManager()
+ .setPipeStatus(pipeInfo.getPipeName(), PipeStatus.STOP);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(status.getMessage());
+ }
+ }
+
+ @Override
+ SyncOperation getOperation() {
+ return SyncOperation.CREATE_PIPE;
+ }
+
+ @Override
+ protected boolean isRollbackSupported(OperatePipeState state) {
+ switch (state) {
+ case OPERATE_CHECK:
+ case OPERATE_PIPE_DATANODE:
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
+ throws IOException, InterruptedException, ProcedureException {
+ LOGGER.error("Roll back CreatePipeProcedure at STATE [{}]", state);
+ // TODO(sync): roll back logic;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeInt(ProcedureFactory.ProcedureType.CREATE_PIPE_PROCEDURE.ordinal());
+ super.serialize(stream);
+ pipeInfo.serialize(stream);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ pipeInfo = PipeInfo.deserializePipeInfo(byteBuffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CreatePipeProcedure that = (CreatePipeProcedure) o;
+ return Objects.equals(pipeInfo, that.pipeInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pipeInfo);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java
new file mode 100644
index 0000000000..54a1dfd37f
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.sync;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+// TODO(sync): drop logic need to be updated
+public class DropPipeProcedure extends AbstractOperatePipeProcedure {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DropPipeProcedure.class);
+
+ private String pipeName;
+
+ public DropPipeProcedure() {
+ super();
+ }
+
+ public DropPipeProcedure(String pipeName) throws PipeException {
+ super();
+ this.pipeName = pipeName;
+ }
+
+ @Override
+ boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException {
+ LOGGER.info("Start to drop PIPE [{}]", pipeName);
+ PipeInfo pipeInfo = env.getConfigManager().getSyncManager().getPipeInfo(pipeName);
+ return pipeInfo.getStatus().equals(PipeStatus.DROP);
+ }
+
+ @Override
+ void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
+ LOGGER.info("Start to pre-drop PIPE [{}] on Config Nodes", pipeName);
+ TSStatus status =
+ env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.PREPARE_DROP);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(status.getMessage());
+ }
+ }
+
+ @Override
+ void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException {
+ LOGGER.info("Start to broadcast drop PIPE [{}] on Data Nodes", pipeName);
+ TSStatus status =
+ RpcUtils.squashResponseStatusList(
+ env.getConfigManager()
+ .getSyncManager()
+ .operatePipeOnDataNodes(pipeName, SyncOperation.DROP_PIPE));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format(
+ "Fail to drop PIPE [%s] on Data Nodes because %s", pipeName, status.getMessage()));
+ }
+ }
+
+ @Override
+ void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
+ LOGGER.info("Start to drop PIPE [{}] on Config Nodes", pipeName);
+ TSStatus status =
+ env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.DROP);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(status.getMessage());
+ }
+ }
+
+ @Override
+ SyncOperation getOperation() {
+ return SyncOperation.DROP_PIPE;
+ }
+
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
+ throws IOException, InterruptedException, ProcedureException {}
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeInt(ProcedureFactory.ProcedureType.DROP_PIPE_PROCEDURE.ordinal());
+ super.serialize(stream);
+ ReadWriteIOUtils.write(pipeName, stream);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ pipeName = ReadWriteIOUtils.readString(byteBuffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ DropPipeProcedure that = (DropPipeProcedure) o;
+ return Objects.equals(pipeName, that.pipeName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pipeName);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
new file mode 100644
index 0000000000..7061443503
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.sync;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class StartPipeProcedure extends AbstractOperatePipeProcedure {
+ private static final Logger LOGGER = LoggerFactory.getLogger(StartPipeProcedure.class);
+
+ private String pipeName;
+
+ public StartPipeProcedure() {
+ super();
+ }
+
+ public StartPipeProcedure(String pipeName) throws PipeException {
+ super();
+ this.pipeName = pipeName;
+ }
+
+ @Override
+ boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException {
+ LOGGER.info("Start to start PIPE [{}]", pipeName);
+ PipeInfo pipeInfo = env.getConfigManager().getSyncManager().getPipeInfo(pipeName);
+ return pipeInfo.getStatus().equals(PipeStatus.RUNNING);
+ }
+
+ @Override
+ void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
+ LOGGER.info("Start to pre-start PIPE [{}] on Config Nodes", pipeName);
+ TSStatus status =
+ env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.PREPARE_START);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(status.getMessage());
+ }
+ }
+
+ @Override
+ void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException {
+ LOGGER.info("Start to broadcast start PIPE [{}] on Data Nodes", pipeName);
+ TSStatus status =
+ RpcUtils.squashResponseStatusList(
+ env.getConfigManager()
+ .getSyncManager()
+ .operatePipeOnDataNodes(pipeName, SyncOperation.START_PIPE));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format(
+ "Fail to start PIPE [%s] on Data Nodes because %s", pipeName, status.getMessage()));
+ }
+ }
+
+ @Override
+ void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
+ LOGGER.info("Start to start PIPE [{}] on Config Nodes", pipeName);
+ TSStatus status =
+ env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.RUNNING);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(status.getMessage());
+ }
+ }
+
+ @Override
+ SyncOperation getOperation() {
+ return SyncOperation.START_PIPE;
+ }
+
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
+ throws IOException, InterruptedException, ProcedureException {}
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeInt(ProcedureFactory.ProcedureType.START_PIPE_PROCEDURE.ordinal());
+ super.serialize(stream);
+ ReadWriteIOUtils.write(pipeName, stream);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ pipeName = ReadWriteIOUtils.readString(byteBuffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ StartPipeProcedure that = (StartPipeProcedure) o;
+ return Objects.equals(pipeName, that.pipeName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pipeName);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java
new file mode 100644
index 0000000000..086811f523
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.sync;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class StopPipeProcedure extends AbstractOperatePipeProcedure {
+ private static final Logger LOGGER = LoggerFactory.getLogger(StopPipeProcedure.class);
+
+ private String pipeName;
+
+ public StopPipeProcedure() {
+ super();
+ }
+
+ public StopPipeProcedure(String pipeName) throws PipeException {
+ super();
+ this.pipeName = pipeName;
+ }
+
+ @Override
+ boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException {
+ LOGGER.info("Start to stop PIPE [{}]", pipeName);
+ PipeInfo pipeInfo = env.getConfigManager().getSyncManager().getPipeInfo(pipeName);
+ return pipeInfo.getStatus().equals(PipeStatus.STOP);
+ }
+
+ @Override
+ void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
+ LOGGER.info("Start to pre-stop PIPE [{}] on Config Nodes", pipeName);
+ TSStatus status =
+ env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.PREPARE_STOP);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(status.getMessage());
+ }
+ }
+
+ @Override
+ void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException {
+ LOGGER.info("Start to broadcast stop PIPE [{}] on Data Nodes", pipeName);
+ TSStatus status =
+ RpcUtils.squashResponseStatusList(
+ env.getConfigManager()
+ .getSyncManager()
+ .operatePipeOnDataNodes(pipeName, SyncOperation.STOP_PIPE));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format(
+ "Fail to stop PIPE [%s] on Data Nodes because %s", pipeName, status.getMessage()));
+ }
+ }
+
+ @Override
+ void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
+ LOGGER.info("Start to stop PIPE [{}] on Config Nodes", pipeName);
+ TSStatus status =
+ env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.STOP);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(status.getMessage());
+ }
+ }
+
+ @Override
+ SyncOperation getOperation() {
+ return SyncOperation.STOP_PIPE;
+ }
+
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
+ throws IOException, InterruptedException, ProcedureException {}
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeInt(ProcedureFactory.ProcedureType.STOP_PIPE_PROCEDURE.ordinal());
+ super.serialize(stream);
+ ReadWriteIOUtils.write(pipeName, stream);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ pipeName = ReadWriteIOUtils.readString(byteBuffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ StopPipeProcedure that = (StopPipeProcedure) o;
+ return Objects.equals(pipeName, that.pipeName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pipeName);
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/sync/OperatePipeState.java
similarity index 81%
copy from node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/sync/OperatePipeState.java
index 0d2a766f37..c8a263ec43 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/sync/OperatePipeState.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.commons.sync.pipe;
+package org.apache.iotdb.confignode.procedure.state.sync;
-public enum PipeStatus {
- // a new pipe should be stop status
- RUNNING,
- STOP,
- DROP
+public enum OperatePipeState {
+ OPERATE_CHECK,
+ PRE_OPERATE_PIPE_CONFIGNODE,
+ OPERATE_PIPE_DATANODE,
+ OPERATE_PIPE_CONFIGNODE
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 6951d3cb0e..a28eff1c8d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -29,6 +29,10 @@ import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroup
import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteStorageGroupProcedure;
import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteTimeSeriesProcedure;
import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.CreatePipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.DropPipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.StartPipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.StopPipeProcedure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,6 +81,18 @@ public class ProcedureFactory implements IProcedureFactory {
case DROP_TRIGGER_PROCEDURE:
procedure = new DropTriggerProcedure();
break;
+ case CREATE_PIPE_PROCEDURE:
+ procedure = new CreatePipeProcedure();
+ break;
+ case START_PIPE_PROCEDURE:
+ procedure = new StartPipeProcedure();
+ break;
+ case STOP_PIPE_PROCEDURE:
+ procedure = new StopPipeProcedure();
+ break;
+ case DROP_PIPE_PROCEDURE:
+ procedure = new DropPipeProcedure();
+ break;
default:
LOGGER.error("unknown Procedure type: " + typeNum);
throw new IOException("unknown Procedure type: " + typeNum);
@@ -104,6 +120,14 @@ public class ProcedureFactory implements IProcedureFactory {
return ProcedureType.CREATE_TRIGGER_PROCEDURE;
} else if (procedure instanceof DropTriggerProcedure) {
return ProcedureType.DROP_TRIGGER_PROCEDURE;
+ } else if (procedure instanceof CreatePipeProcedure) {
+ return ProcedureType.CREATE_PIPE_PROCEDURE;
+ } else if (procedure instanceof StartPipeProcedure) {
+ return ProcedureType.START_PIPE_PROCEDURE;
+ } else if (procedure instanceof StopPipeProcedure) {
+ return ProcedureType.STOP_PIPE_PROCEDURE;
+ } else if (procedure instanceof DropPipeProcedure) {
+ return ProcedureType.DROP_PIPE_PROCEDURE;
}
return null;
}
@@ -117,7 +141,11 @@ public class ProcedureFactory implements IProcedureFactory {
CREATE_REGION_GROUPS,
DELETE_TIMESERIES_PROCEDURE,
CREATE_TRIGGER_PROCEDURE,
- DROP_TRIGGER_PROCEDURE
+ DROP_TRIGGER_PROCEDURE,
+ CREATE_PIPE_PROCEDURE,
+ START_PIPE_PROCEDURE,
+ STOP_PIPE_PROCEDURE,
+ DROP_PIPE_PROCEDURE
}
private static class ProcedureFactoryHolder {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 5638835c5c..8381d6e2cc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -101,6 +101,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
@@ -116,6 +117,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
@@ -620,6 +623,31 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
return configManager.getPipeSink(req);
}
+ @Override
+ public TSStatus createPipe(TPipeInfo req) {
+ return configManager.createPipe(req);
+ }
+
+ @Override
+ public TSStatus startPipe(String pipeName) {
+ return configManager.startPipe(pipeName);
+ }
+
+ @Override
+ public TSStatus stopPipe(String pipeName) {
+ return configManager.stopPipe(pipeName);
+ }
+
+ @Override
+ public TSStatus dropPipe(String pipeName) {
+ return configManager.dropPipe(pipeName);
+ }
+
+ @Override
+ public TShowPipeResp showPipe(TShowPipeReq req) {
+ return configManager.showPipe(req);
+ }
+
@Override
@TestOnly
public TGetRoutingResp getRouting(TGetRoutingReq req) {
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 92ad262529..ba23c19619 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -35,6 +35,9 @@ import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.commons.partition.SeriesPartitionTable;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
+import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupPlan;
@@ -74,6 +77,9 @@ import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimeP
import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
@@ -898,6 +904,42 @@ public class ConfigPhysicalPlanSerDeTest {
getPipeSinkPlanWithNullName1.getPipeSinkName());
}
+ @Test
+ public void PreCreatePipePlanTest() throws IOException {
+ PipeInfo pipeInfo =
+ new TsFilePipeInfo(
+ "name", "demo", PipeStatus.PREPARE_CREATE, System.currentTimeMillis(), 999, false);
+ PreCreatePipePlan PreCreatePipePlan = new PreCreatePipePlan(pipeInfo);
+ PreCreatePipePlan PreCreatePipePlan1 =
+ (PreCreatePipePlan)
+ ConfigPhysicalPlan.Factory.create(PreCreatePipePlan.serializeToByteBuffer());
+ Assert.assertEquals(PreCreatePipePlan.getPipeInfo(), PreCreatePipePlan1.getPipeInfo());
+ }
+
+ @Test
+ public void SetPipeStatusPlan() throws IOException {
+ SetPipeStatusPlan setPipeStatusPlan = new SetPipeStatusPlan("pipe", PipeStatus.PREPARE_CREATE);
+ SetPipeStatusPlan setPipeStatusPlan1 =
+ (SetPipeStatusPlan)
+ ConfigPhysicalPlan.Factory.create(setPipeStatusPlan.serializeToByteBuffer());
+ Assert.assertEquals(setPipeStatusPlan.getPipeName(), setPipeStatusPlan1.getPipeName());
+ Assert.assertEquals(setPipeStatusPlan.getPipeStatus(), setPipeStatusPlan1.getPipeStatus());
+ }
+
+ @Test
+ public void ShowPipePlanTest() throws IOException {
+ ShowPipePlan showPipePlan = new ShowPipePlan("demo");
+ ShowPipePlan showPipePlan1 =
+ (ShowPipePlan) ConfigPhysicalPlan.Factory.create(showPipePlan.serializeToByteBuffer());
+ Assert.assertEquals(showPipePlan.getPipeName(), showPipePlan1.getPipeName());
+ ShowPipePlan showPipePlanWithNullName = new ShowPipePlan();
+ ShowPipePlan showPipePlanWithNullName1 =
+ (ShowPipePlan)
+ ConfigPhysicalPlan.Factory.create(showPipePlanWithNullName.serializeToByteBuffer());
+ Assert.assertEquals(
+ showPipePlanWithNullName.getPipeName(), showPipePlanWithNullName1.getPipeName());
+ }
+
@Test
public void GetTriggerTablePlan() throws IOException {
GetTriggerTablePlan getTriggerTablePlan0 = new GetTriggerTablePlan();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/OperatePipeProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/OperatePipeProcedureTest.java
new file mode 100644
index 0000000000..16190ea3eb
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/OperatePipeProcedureTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.confignode.procedure.impl.sync.CreatePipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.DropPipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.StartPipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.StopPipeProcedure;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class OperatePipeProcedureTest {
+
+ @Test
+ public void serializeDeserializeCreatePipeProcedureTest() throws PipeException {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("syncdelop", "false");
+ TPipeInfo pipeInfo =
+ new TPipeInfo()
+ .setPipeName("PipeName")
+ .setPipeSinkName("PipeSinkName")
+ .setStartTime(999)
+ .setAttributes(attributes);
+
+ CreatePipeProcedure p1 = new CreatePipeProcedure(pipeInfo);
+
+ try {
+ p1.serialize(outputStream);
+ ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+
+ CreatePipeProcedure p2 = (CreatePipeProcedure) ProcedureFactory.getInstance().create(buffer);
+ assertEquals(p1, p2);
+ } catch (Exception e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void serializeDeserializeStartPipeProcedureTest() throws PipeException {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+ StartPipeProcedure p1 = new StartPipeProcedure("pipeName");
+
+ try {
+ p1.serialize(outputStream);
+ ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+
+ StartPipeProcedure p2 = (StartPipeProcedure) ProcedureFactory.getInstance().create(buffer);
+ assertEquals(p1, p2);
+ } catch (Exception e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void serializeDeserializeStopPipeProcedureTest() throws PipeException {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+ StopPipeProcedure p1 = new StopPipeProcedure("pipeName");
+
+ try {
+ p1.serialize(outputStream);
+ ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+
+ StopPipeProcedure p2 = (StopPipeProcedure) ProcedureFactory.getInstance().create(buffer);
+ assertEquals(p1, p2);
+ } catch (Exception e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void serializeDeserializeDropPipeProcedureTest() throws PipeException {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+ DropPipeProcedure p1 = new DropPipeProcedure("pipeName");
+
+ try {
+ p1.serialize(outputStream);
+ ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+
+ DropPipeProcedure p2 = (DropPipeProcedure) ProcedureFactory.getInstance().create(buffer);
+ assertEquals(p1, p2);
+ } catch (Exception e) {
+ fail();
+ }
+ }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java
index 106e837a23..ad4ff7a3b9 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.it.sync;
import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.junit.AfterClass;
@@ -37,7 +38,7 @@ import java.sql.Statement;
import static org.apache.iotdb.db.it.utils.TestUtils.assertResultSetEqual;
@RunWith(IoTDBTestRunner.class)
-@Category({LocalStandaloneIT.class})
+@Category({LocalStandaloneIT.class, ClusterIT.class})
public class IoTDBPipeIT {
private static String ip;
private static int port;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeNotExistException.java
similarity index 68%
copy from node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeNotExistException.java
index d2bfdf743b..3edb2a1463 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeNotExistException.java
@@ -16,13 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.commons.sync.pipe;
+package org.apache.iotdb.commons.exception.sync;
-public enum SyncOperation {
- CREATE_PIPESINK,
- DROP_PIPESINK,
- CREATE_PIPE,
- START_PIPE,
- STOP_PIPE,
- DROP_PIPE
+public class PipeNotExistException extends PipeException {
+ public PipeNotExistException(String pipeName, int errorCode) {
+ super(String.format("PIPE [%s] does not exist", pipeName), errorCode);
+ }
+
+ public PipeNotExistException(String pipeName) {
+ super(String.format("PIPE [%s] does not exist", pipeName));
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
index 57043817f7..d8079dd217 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.sync.metadata;
import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.exception.sync.PipeNotExistException;
import org.apache.iotdb.commons.exception.sync.PipeSinkException;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.commons.sync.persistence.SyncLogReader;
@@ -145,35 +146,30 @@ public class SyncMetadata implements SnapshotProcessor {
// region Implement of Pipe
// ======================================================
- public void addPipe(PipeInfo pipeInfo, PipeSink pipeSink) throws PipeException {
- // common check
+ public void checkAddPipe(PipeInfo pipeInfo) throws PipeException {
+ // check PipeSink exists
+ if (!isPipeSinkExist(pipeInfo.getPipeSinkName())) {
+ throw new PipeException(
+ String.format("can not find PIPESINK %s.", pipeInfo.getPipeSinkName()));
+ }
+ // check Pipe does not exist
if (runningPipe != null && runningPipe.getStatus() != PipeStatus.DROP) {
throw new PipeException(
String.format(
- "Pipe %s is %s, please retry after drop it.",
+ "PIPE %s is %s, please retry after drop it.",
runningPipe.getPipeName(), runningPipe.getStatus().name()));
}
+ }
+
+ public void addPipe(PipeInfo pipeInfo) {
runningPipe = pipeInfo;
pipes
.computeIfAbsent(runningPipe.getPipeName(), i -> new ConcurrentHashMap<>())
.computeIfAbsent(runningPipe.getCreateTime(), i -> runningPipe);
}
- public void operatePipe(String pipeName, SyncOperation syncOperation) throws PipeException {
- checkIfPipeExistAndRunning(pipeName);
- switch (syncOperation) {
- case START_PIPE:
- runningPipe.start();
- break;
- case STOP_PIPE:
- runningPipe.stop();
- break;
- case DROP_PIPE:
- runningPipe.drop();
- break;
- default:
- throw new PipeException("Unknown operatorType " + syncOperation);
- }
+ public void setPipeStatus(String pipeName, PipeStatus status) throws PipeException {
+ runningPipe.setStatus(status);
}
public PipeInfo getPipeInfo(String pipeName, long createTime) {
@@ -193,14 +189,14 @@ public class SyncMetadata implements SnapshotProcessor {
return runningPipe;
}
- private void checkIfPipeExistAndRunning(String pipeName) throws PipeException {
+ public void checkIfPipeExist(String pipeName) throws PipeException {
if (runningPipe == null || runningPipe.getStatus() == PipeStatus.DROP) {
- throw new PipeException("There is no existing pipe.");
+ throw new PipeNotExistException(pipeName);
}
if (!runningPipe.getPipeName().equals(pipeName)) {
throw new PipeException(
String.format(
- "Pipe %s is %s, please retry after drop it.",
+ "PIPE %s is %s, please retry after drop it.",
runningPipe.getPipeName(), runningPipe.getStatus()));
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java
index 2f402caf6d..870b35cbba 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.sync.persistence;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
import org.apache.iotdb.commons.sync.pipe.SyncOperation;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.commons.sync.utils.SyncConstant;
@@ -98,17 +99,17 @@ public class SyncLogReader {
case STOP_PIPE:
// TODO: support multiple pipe
ReadWriteIOUtils.readString(inputStream);
- runningPipe.stop();
+ runningPipe.setStatus(PipeStatus.STOP);
break;
case START_PIPE:
// TODO: support multiple pipe
ReadWriteIOUtils.readString(inputStream);
- runningPipe.start();
+ runningPipe.setStatus(PipeStatus.RUNNING);
break;
case DROP_PIPE:
// TODO: support multiple pipe
ReadWriteIOUtils.readString(inputStream);
- runningPipe.drop();
+ runningPipe.setStatus(PipeStatus.DROP);
break;
default:
throw new UnsupportedOperationException(
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java
index b93da6a86a..074315c716 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java
@@ -18,13 +18,22 @@
*/
package org.apache.iotdb.commons.sync.pipe;
+import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
public abstract class PipeInfo {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeInfo.class);
protected String pipeName;
protected String pipeSinkName;
@@ -85,18 +94,6 @@ public abstract class PipeInfo {
this.messageType = messageType;
}
- public void start() {
- this.status = PipeStatus.RUNNING;
- }
-
- public void stop() {
- this.status = PipeStatus.STOP;
- }
-
- public void drop() {
- this.status = PipeStatus.DROP;
- }
-
public long getCreateTime() {
return createTime;
}
@@ -105,6 +102,8 @@ public abstract class PipeInfo {
this.createTime = createTime;
}
+ public abstract TShowPipeInfo getTShowPipeInfo();
+
public void serialize(OutputStream outputStream) throws IOException {
ReadWriteIOUtils.write((byte) getType().ordinal(), outputStream);
ReadWriteIOUtils.write(pipeName, outputStream);
@@ -122,6 +121,25 @@ public abstract class PipeInfo {
messageType = PipeMessage.PipeMessageType.values()[ReadWriteIOUtils.readByte(inputStream)];
}
+ protected void deserialize(ByteBuffer byteBuffer) {
+ pipeName = ReadWriteIOUtils.readString(byteBuffer);
+ pipeSinkName = ReadWriteIOUtils.readString(byteBuffer);
+ status = PipeStatus.values()[ReadWriteIOUtils.readByte(byteBuffer)];
+ createTime = ReadWriteIOUtils.readLong(byteBuffer);
+ messageType = PipeMessage.PipeMessageType.values()[ReadWriteIOUtils.readByte(byteBuffer)];
+ }
+
+ public ByteBuffer serializeToByteBuffer() {
+ try (PublicBAOS publicBAOS = new PublicBAOS();
+ DataOutputStream dataOutputStream = new DataOutputStream(publicBAOS)) {
+ serialize(dataOutputStream);
+ return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ } catch (IOException e) {
+ LOGGER.error("Unexpected error occurred when serializing PipeInfo.");
+ throw new SerializationRunTimeException(e);
+ }
+ }
+
public static PipeInfo deserializePipeInfo(InputStream inputStream) throws IOException {
PipeType pipeType = PipeType.values()[ReadWriteIOUtils.readByte(inputStream)];
PipeInfo pipeInfo;
@@ -138,6 +156,22 @@ public abstract class PipeInfo {
return pipeInfo;
}
+ public static PipeInfo deserializePipeInfo(ByteBuffer byteBuffer) {
+ PipeType pipeType = PipeType.values()[ReadWriteIOUtils.readByte(byteBuffer)];
+ PipeInfo pipeInfo;
+ switch (pipeType) {
+ case TsFilePipe:
+ pipeInfo = new TsFilePipeInfo();
+ pipeInfo.deserialize(byteBuffer);
+ break;
+ case WALPipe:
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Can not recognize PipeType %s.", pipeType.name()));
+ }
+ return pipeInfo;
+ }
+
enum PipeType {
TsFilePipe,
WALPipe
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java
index 0d2a766f37..09f92c4c25 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java
@@ -22,5 +22,10 @@ public enum PipeStatus {
// a new pipe should be stop status
RUNNING,
STOP,
- DROP
+ DROP,
+ // internal status
+ PREPARE_CREATE,
+ PREPARE_START,
+ PREPARE_STOP,
+ PREPARE_DROP
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
index d2bfdf743b..ec935e4c94 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.commons.sync.pipe;
public enum SyncOperation {
+ // PIPESINK
CREATE_PIPESINK,
DROP_PIPESINK,
+ // PIPE
CREATE_PIPE,
START_PIPE,
STOP_PIPE,
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
index 391d4c65fd..288cb8f80d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
@@ -18,11 +18,13 @@
*/
package org.apache.iotdb.commons.sync.pipe;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.Objects;
public class TsFilePipeInfo extends PipeInfo {
@@ -76,6 +78,12 @@ public class TsFilePipeInfo extends PipeInfo {
return PipeType.TsFilePipe;
}
+ @Override
+ public TShowPipeInfo getTShowPipeInfo() {
+ return new TShowPipeInfo(
+ createTime, pipeName, "sender", pipeSinkName, status.name(), messageType.name());
+ }
+
@Override
public void serialize(OutputStream outputStream) throws IOException {
super.serialize(outputStream);
@@ -90,6 +98,13 @@ public class TsFilePipeInfo extends PipeInfo {
dataStartTimestamp = ReadWriteIOUtils.readLong(inputStream);
}
+ @Override
+ protected void deserialize(ByteBuffer buffer) {
+ super.deserialize(buffer);
+ syncDelOp = ReadWriteIOUtils.readBool(buffer);
+ dataStartTimestamp = ReadWriteIOUtils.readLong(buffer);
+ }
+
@Override
public String toString() {
return "TsFilePipeInfo{"
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java
index b0cbf2aef6..4ac32c6899 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -143,6 +144,13 @@ public class IoTDBPipeSink implements PipeSink {
port = ReadWriteIOUtils.readInt(inputStream);
}
+ @Override
+ public void deserialize(ByteBuffer buffer) {
+ name = ReadWriteIOUtils.readString(buffer);
+ ip = ReadWriteIOUtils.readString(buffer);
+ port = ReadWriteIOUtils.readInt(buffer);
+ }
+
@Override
public String toString() {
return "IoTDBPipeSink{"
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java
index 588618d96e..d79aae750c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
@@ -49,6 +50,8 @@ public interface PipeSink {
void deserialize(InputStream inputStream) throws IOException;
+ void deserialize(ByteBuffer buffer);
+
static PipeSink deserializePipeSink(InputStream inputStream) throws IOException {
PipeSinkType pipeSinkType = PipeSinkType.values()[ReadWriteIOUtils.readByte(inputStream)];
PipeSink pipeSink;
@@ -66,6 +69,23 @@ public interface PipeSink {
return pipeSink;
}
+ static PipeSink deserializePipeSink(ByteBuffer buffer) {
+ PipeSinkType pipeSinkType = PipeSinkType.values()[ReadWriteIOUtils.readByte(buffer)];
+ PipeSink pipeSink;
+ switch (pipeSinkType) {
+ case IoTDB:
+ pipeSink = new IoTDBPipeSink();
+ pipeSink.deserialize(buffer);
+ break;
+ case ExternalPipe:
+ // TODO(ext-pipe): deserialize external pipesink here
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Can not recognize PipeSinkType %s.", pipeSinkType.name()));
+ }
+ return pipeSink;
+ }
+
enum PipeSinkType {
IoTDB,
ExternalPipe
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
index 53e089dad8..36ba2d3415 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.commons.utils;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
@@ -136,7 +137,7 @@ public class ThriftConfigNodeSerDeUtils {
try {
pipeSinkInfo.write(generateWriteProtocol(stream));
} catch (TException e) {
- throw new ThriftSerDeException("Write TConfigNodeLocation failed: ", e);
+ throw new ThriftSerDeException("Write TPipeSinkInfo failed: ", e);
}
}
@@ -145,8 +146,26 @@ public class ThriftConfigNodeSerDeUtils {
try {
pipeSinkInfo.read(generateReadProtocol(buffer));
} catch (TException e) {
- throw new ThriftSerDeException("Read TConfigNodeLocation failed: ", e);
+ throw new ThriftSerDeException("Read TPipeSinkInfo failed: ", e);
}
return pipeSinkInfo;
}
+
+ public static void serializeTPipeInfo(TPipeInfo pipeInfo, DataOutputStream stream) {
+ try {
+ pipeInfo.write(generateWriteProtocol(stream));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Write TPipeInfo failed: ", e);
+ }
+ }
+
+ public static TPipeInfo deserializeTPipeInfo(ByteBuffer buffer) {
+ TPipeInfo pipeInfo = new TPipeInfo();
+ try {
+ pipeInfo.read(generateReadProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Read TPipeInfo failed: ", e);
+ }
+ return pipeInfo;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index f9dd77cc37..39fae8d887 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -70,6 +70,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
@@ -85,6 +86,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
@@ -1075,6 +1078,86 @@ public class ConfigNodeClient
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TSStatus createPipe(TPipeInfo req) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.createPipe(req);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
+ @Override
+ public TSStatus startPipe(String pipeName) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.startPipe(pipeName);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
+ @Override
+ public TSStatus stopPipe(String pipeName) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.stopPipe(pipeName);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
+ @Override
+ public TSStatus dropPipe(String pipeName) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.dropPipe(pipeName);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
+ @Override
+ public TShowPipeResp showPipe(TShowPipeReq req) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TShowPipeResp resp = client.showPipe(req);
+ if (!updateConfigNodeLeader(resp.getStatus())) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
@Override
@TestOnly
public TGetRoutingResp getRouting(TGetRoutingReq req) throws TException {
diff --git a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index bcbdd4d0ff..c0737f884e 100644
--- a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -55,7 +55,7 @@ import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.commons.utils.AuthUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -95,6 +95,7 @@ import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
import org.apache.iotdb.db.rescon.MemTableManager;
import org.apache.iotdb.db.sync.SyncService;
+import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -1337,7 +1338,7 @@ public class LocalConfigNode {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
- public List<PipeSink> showPipeSink(String pipeSinkName) {
+ public List<PipeSink> showPipeSink(String pipeSinkName) throws PipeSinkException {
boolean showAll = StringUtils.isEmpty(pipeSinkName);
if (showAll) {
return syncService.getAllPipeSink();
@@ -1348,7 +1349,9 @@ public class LocalConfigNode {
public TSStatus createPipe(CreatePipeStatement createPipeStatement) {
try {
- syncService.addPipe(createPipeStatement);
+ syncService.addPipe(
+ SyncPipeUtil.parseCreatePipePlanAsPipeInfo(
+ createPipeStatement, System.currentTimeMillis()));
} catch (PipeException e) {
return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, e.getMessage());
}
@@ -1383,7 +1386,7 @@ public class LocalConfigNode {
}
public TShowPipeResp showPipe(String pipeName) {
- List<TPipeInfo> pipeInfos = SyncService.getInstance().showPipe(pipeName);
+ List<TShowPipeInfo> pipeInfos = SyncService.getInstance().showPipe(pipeName);
return new TShowPipeResp().setPipeInfoList(pipeInfos).setStatus(StatusUtils.OK);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 33db99d1dc..47247a9121 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -43,11 +43,14 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
@@ -73,6 +76,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowNodes
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowPathSetTemplateTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowSchemaTemplateTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeTask;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
@@ -812,7 +816,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
TSStatus tsStatus = configNodeClient.createPipeSink(pipeSinkInfo);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.error(
- "Failed to create PipeSink {} with type {} in config node, status is {}.",
+ "Failed to create PIPESINK {} with type {} in config node, status is {}.",
createPipeSinkStatement.getPipeSinkName(),
createPipeSinkStatement.getPipeSinkType(),
tsStatus);
@@ -837,7 +841,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
TSStatus tsStatus = configNodeClient.dropPipeSink(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.error(
- "Failed to drop PipeSink {} in config node, status is {}.",
+ "Failed to drop PIPESINK {} in config node, status is {}.",
dropPipeSinkStatement.getPipeSinkName(),
tsStatus);
future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
@@ -871,50 +875,101 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
@Override
public SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement createPipeStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- future.setException(
- new IoTDBException(
- "Executing create pipe is not supported",
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+ try (ConfigNodeClient configNodeClient =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ TPipeInfo pipeInfo =
+ new TPipeInfo()
+ .setPipeName(createPipeStatement.getPipeName())
+ .setPipeSinkName(createPipeStatement.getPipeSinkName())
+ .setStartTime(createPipeStatement.getStartTime())
+ .setAttributes(createPipeStatement.getPipeAttributes());
+ TSStatus tsStatus = configNodeClient.createPipe(pipeInfo);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+ LOGGER.error(
+ "Failed to create PIPE {} in config node, status is {}.",
+ createPipeStatement.getPipeName(),
+ tsStatus);
+ future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (Exception e) {
+ future.setException(e);
+ }
return future;
}
@Override
- public SettableFuture<ConfigTaskResult> dropPipe(DropPipeStatement dropPipeStatement) {
+ public SettableFuture<ConfigTaskResult> startPipe(StartPipeStatement startPipeStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- future.setException(
- new IoTDBException(
- "Executing drop pipe is not supported",
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+ try (ConfigNodeClient configNodeClient =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ TSStatus tsStatus = configNodeClient.startPipe(startPipeStatement.getPipeName());
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+ LOGGER.error(
+ "Failed to start PIPE {}, status is {}.", startPipeStatement.getPipeName(), tsStatus);
+ future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (Exception e) {
+ future.setException(e);
+ }
return future;
}
@Override
- public SettableFuture<ConfigTaskResult> showPipe(ShowPipeStatement showPipeStatement) {
+ public SettableFuture<ConfigTaskResult> dropPipe(DropPipeStatement dropPipeStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- future.setException(
- new IoTDBException(
- "Executing show pipe is not supported",
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+ try (ConfigNodeClient configNodeClient =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ TSStatus tsStatus = configNodeClient.dropPipe(dropPipeStatement.getPipeName());
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+ LOGGER.error(
+ "Failed to drop PIPE {}, status is {}.", dropPipeStatement.getPipeName(), tsStatus);
+ future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (Exception e) {
+ future.setException(e);
+ }
return future;
}
@Override
- public SettableFuture<ConfigTaskResult> startPipe(StartPipeStatement startPipeStatement) {
+ public SettableFuture<ConfigTaskResult> stopPipe(StopPipeStatement stopPipeStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- future.setException(
- new IoTDBException(
- "Executing Start pipe is not supported",
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+ try (ConfigNodeClient configNodeClient =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ TSStatus tsStatus = configNodeClient.stopPipe(stopPipeStatement.getPipeName());
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+ LOGGER.error(
+ "Failed to stop PIPE {}, status is {}.", stopPipeStatement.getPipeName(), tsStatus);
+ future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (Exception e) {
+ future.setException(e);
+ }
return future;
}
@Override
- public SettableFuture<ConfigTaskResult> stopPipe(StopPipeStatement stopPipeStatement) {
+ public SettableFuture<ConfigTaskResult> showPipe(ShowPipeStatement showPipeStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- future.setException(
- new IoTDBException(
- "Executing stop pipe is not supported",
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+ try (ConfigNodeClient configNodeClient =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ TShowPipeReq tShowPipeReq = new TShowPipeReq();
+ if (!StringUtils.isEmpty(showPipeStatement.getPipeName())) {
+ tShowPipeReq.setPipeName(showPipeStatement.getPipeName());
+ }
+ TShowPipeResp resp = configNodeClient.showPipe(tShowPipeReq);
+ ShowPipeTask.buildTSBlock(resp.getPipeInfoList(), future);
+ } catch (Exception e) {
+ future.setException(e);
+ }
return future;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
index 934cec4434..d2c4a0a333 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.exception.sync.PipeSinkException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
@@ -72,6 +73,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -510,9 +512,13 @@ public class StandaloneConfigTaskExecutor implements IConfigTaskExecutor {
public SettableFuture<ConfigTaskResult> showPipeSink(
ShowPipeSinkStatement showPipeSinkStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- List<PipeSink> pipeSinkList =
- LocalConfigNode.getInstance().showPipeSink(showPipeSinkStatement.getPipeSinkName());
- ShowPipeSinkTask.buildTSBlockByPipeSink(pipeSinkList, future);
+ try {
+ List<PipeSink> pipeSinkList =
+ LocalConfigNode.getInstance().showPipeSink(showPipeSinkStatement.getPipeSinkName());
+ ShowPipeSinkTask.buildTSBlockByPipeSink(pipeSinkList, future);
+ } catch (PipeSinkException e) {
+ ShowPipeSinkTask.buildTSBlockByPipeSink(Collections.emptyList(), future);
+ }
return future;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeSinkTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeSinkTask.java
index d78ac36e98..2f0965ab5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeSinkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeSinkTask.java
@@ -83,7 +83,7 @@ public class ShowPipeSinkTask implements IConfigTask {
.collect(Collectors.toList());
TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
for (TPipeSinkInfo pipeSinkInfo : pipeSinkInfoList) {
- PipeSink pipeSink = SyncPipeUtil.parsePipeInfoAsPipe(pipeSinkInfo);
+ PipeSink pipeSink = SyncPipeUtil.parseTPipeSinkInfoAsPipeSink(pipeSinkInfo);
builder.getTimeColumnBuilder().writeLong(0L);
builder.getColumnBuilder(0).writeBinary(new Binary(pipeSink.getPipeSinkName()));
builder.getColumnBuilder(1).writeBinary(new Binary(pipeSink.getType().name()));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
index 180cdd2d1d..5ecb7866c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.plan.execution.config.sys.sync;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
@@ -55,13 +55,13 @@ public class ShowPipeTask implements IConfigTask {
}
public static void buildTSBlock(
- List<TPipeInfo> pipeInfoList, SettableFuture<ConfigTaskResult> future) {
+ List<TShowPipeInfo> pipeInfoList, SettableFuture<ConfigTaskResult> future) {
List<TSDataType> outputDataTypes =
ColumnHeaderConstant.showPipeColumnHeaders.stream()
.map(ColumnHeader::getColumnType)
.collect(Collectors.toList());
TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
- for (TPipeInfo tPipeInfo : pipeInfoList) {
+ for (TShowPipeInfo tPipeInfo : pipeInfoList) {
builder.getTimeColumnBuilder().writeLong(0L);
builder
.getColumnBuilder(0)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 00a275f1e2..1c12638403 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -159,6 +159,7 @@ import org.apache.iotdb.db.tools.TsFileSplitByPartitionTool;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -2494,7 +2495,8 @@ public class PlanExecutor implements IPlanExecutor {
private void createPipe(CreatePipePlan plan) throws QueryProcessException {
try {
- SyncService.getInstance().addPipe(plan);
+ SyncService.getInstance()
+ .addPipe(SyncPipeUtil.parseCreatePipePlanAsPipeInfo(plan, System.currentTimeMillis()));
} catch (PipeException e) {
throw new QueryProcessException("Create pipe error.", e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 33d8b9de14..63df1528e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -34,8 +34,11 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
@@ -82,6 +85,7 @@ import org.apache.iotdb.db.service.RegionMigrateService;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
+import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
@@ -97,6 +101,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
@@ -118,6 +123,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
+import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
@@ -505,6 +511,40 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
return RpcUtils.SUCCESS_STATUS;
}
+ @Override
+ public TSStatus createPipeOnDataNode(TCreatePipeOnDataNodeReq req) throws TException {
+ try {
+ PipeInfo pipeInfo = PipeInfo.deserializePipeInfo(req.pipeInfo);
+ SyncService.getInstance().addPipe(pipeInfo);
+ return RpcUtils.SUCCESS_STATUS;
+ } catch (PipeException e) {
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ }
+ }
+
+ @Override
+ public TSStatus operatePipeOnDataNode(TOperatePipeOnDataNodeReq req) throws TException {
+ try {
+ switch (SyncOperation.values()[req.getOperation()]) {
+ case START_PIPE:
+ SyncService.getInstance().startPipe(req.getPipeName());
+ break;
+ case STOP_PIPE:
+ SyncService.getInstance().stopPipe(req.getPipeName());
+ break;
+ case DROP_PIPE:
+ SyncService.getInstance().dropPipe(req.getPipeName());
+ break;
+ default:
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+ .setMessage("Unsupported operation.");
+ }
+ return RpcUtils.SUCCESS_STATUS;
+ } catch (PipeException e) {
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ }
+ }
+
private PathPatternTree filterPathPatternTree(PathPatternTree patternTree, String storageGroup) {
PathPatternTree filteredPatternTree = new PathPatternTree();
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index 35a064c531..f4b9cd6ce4 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -30,18 +30,20 @@ import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.sync.pipe.PipeStatus;
+import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.commons.sync.utils.SyncConstant;
import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.query.dataset.ListDataSet;
+import org.apache.iotdb.db.sync.common.ClusterSyncInfoFetcher;
import org.apache.iotdb.db.sync.common.ISyncInfoFetcher;
import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginManager;
@@ -75,19 +77,25 @@ import java.util.List;
public class SyncService implements IService {
private static final Logger logger = LoggerFactory.getLogger(SyncService.class);
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private Pipe runningPipe;
/* handle external Pipe */
private ExtPipePluginManager extPipePluginManager;
- private ISyncInfoFetcher syncInfoFetcher = LocalSyncInfoFetcher.getInstance();
+ private ISyncInfoFetcher syncInfoFetcher;
/* handle rpc in receiver-side*/
private final ReceiverManager receiverManager;
private SyncService() {
receiverManager = new ReceiverManager();
+ if (config.isClusterMode()) {
+ syncInfoFetcher = ClusterSyncInfoFetcher.getInstance();
+ } else {
+ syncInfoFetcher = LocalSyncInfoFetcher.getInstance();
+ }
}
private static class SyncServiceHolder {
@@ -124,7 +132,7 @@ public class SyncService implements IService {
// region Interfaces and Implementation of PipeSink
- public PipeSink getPipeSink(String name) {
+ public PipeSink getPipeSink(String name) throws PipeSinkException {
return syncInfoFetcher.getPipeSink(name);
}
@@ -159,51 +167,35 @@ public class SyncService implements IService {
// region Interfaces and Implementation of Pipe
- // TODO(sync): delete this in new-standalone version
- public synchronized void addPipe(CreatePipePlan plan) throws PipeException {
- // check plan
+ public synchronized void addPipe(PipeInfo pipeInfo) throws PipeException {
+ logger.info("Execute CREATE PIPE {}", pipeInfo.getPipeName());
long currentTime = DatetimeUtils.currentTime();
- if (plan.getDataStartTimestamp() > currentTime) {
- throw new PipeException(
- String.format(
- "Start time %s is later than current time %s, this is not supported yet.",
- DatetimeUtils.convertLongToDate(plan.getDataStartTimestamp()),
- DatetimeUtils.convertLongToDate(currentTime)));
+ if (pipeInfo instanceof TsFilePipeInfo) {
+ // TODO(sync): move check logic to PipeInfo#validate()
+ // check statement
+ if (((TsFilePipeInfo) pipeInfo).getDataStartTimestamp() > currentTime) {
+ throw new PipeException(
+ String.format(
+ "Start time %s is later than current time %s, this is not supported yet.",
+ DatetimeUtils.convertLongToDate(
+ ((TsFilePipeInfo) pipeInfo).getDataStartTimestamp()),
+ DatetimeUtils.convertLongToDate(currentTime)));
+ }
}
// add pipe
- TSStatus status = syncInfoFetcher.addPipe(plan, currentTime);
+ TSStatus status = syncInfoFetcher.addPipe(pipeInfo);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(status.message);
}
-
- PipeSink runningPipeSink = getPipeSink(plan.getPipeSinkName());
- runningPipe = SyncPipeUtil.parseCreatePipePlanAsPipe(plan, runningPipeSink, currentTime);
- if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.ExternalPipe) {
- // for external pipe
- // == start ExternalPipeProcessor for send data to external pipe plugin
- startExternalPipeManager(false);
+ PipeSink runningPipeSink;
+ try {
+ runningPipeSink = getPipeSink(pipeInfo.getPipeSinkName());
+ } catch (PipeSinkException e) {
+ logger.error("failed to add PIPE because {}", e.getMessage(), e);
+ throw new PipeException(String.format("failed to add PIPE because %s", e.getMessage()));
}
- }
- public synchronized void addPipe(CreatePipeStatement statement) throws PipeException {
- logger.info("Execute CREATE PIPE {}", statement.getPipeName());
- // check statement
- long currentTime = DatetimeUtils.currentTime();
- if (statement.getStartTime() > currentTime) {
- throw new PipeException(
- String.format(
- "Start time %s is later than current time %s, this is not supported yet.",
- DatetimeUtils.convertLongToDate(statement.getStartTime()),
- DatetimeUtils.convertLongToDate(currentTime)));
- }
- // add pipe
- TSStatus status = syncInfoFetcher.addPipe(statement, currentTime);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(status.message);
- }
-
- PipeSink runningPipeSink = getPipeSink(statement.getPipeSinkName());
- runningPipe = SyncPipeUtil.parseCreatePipePlanAsPipe(statement, runningPipeSink, currentTime);
+ runningPipe = SyncPipeUtil.parseTPipeSinkInfoAsPipeSink(pipeInfo, runningPipeSink);
if (runningPipe.getPipeSink().getType()
== PipeSink.PipeSinkType.ExternalPipe) { // for external pipe
// == start ExternalPipeProcessor for send data to external pipe plugin
@@ -212,7 +204,7 @@ public class SyncService implements IService {
}
public synchronized void stopPipe(String pipeName) throws PipeException {
- logger.info("Execute STOP PIPE {}", pipeName);
+ logger.info("Execute stop PIPE {}", pipeName);
checkRunningPipeExistAndName(pipeName);
if (runningPipe.getStatus() == PipeStatus.RUNNING) {
if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
@@ -232,11 +224,14 @@ public class SyncService implements IService {
runningPipe.stop();
}
}
- syncInfoFetcher.stopPipe(pipeName);
+ TSStatus status = syncInfoFetcher.stopPipe(pipeName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(status.message);
+ }
}
public synchronized void startPipe(String pipeName) throws PipeException {
- logger.info("Execute START PIPE {}", pipeName);
+ logger.info("Execute start PIPE {}", pipeName);
checkRunningPipeExistAndName(pipeName);
if (runningPipe.getStatus() == PipeStatus.STOP) {
if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
@@ -246,11 +241,14 @@ public class SyncService implements IService {
startExternalPipeManager(true);
}
}
- syncInfoFetcher.startPipe(pipeName);
+ TSStatus status = syncInfoFetcher.startPipe(pipeName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(status.message);
+ }
}
public synchronized void dropPipe(String pipeName) throws PipeException {
- logger.info("Execute DROP PIPE {}", pipeName);
+ logger.info("Execute drop PIPE {}", pipeName);
checkRunningPipeExistAndName(pipeName);
if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
runningPipe.drop();
@@ -265,7 +263,10 @@ public class SyncService implements IService {
runningPipe.drop();
}
- syncInfoFetcher.dropPipe(pipeName);
+ TSStatus status = syncInfoFetcher.dropPipe(pipeName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(status.message);
+ }
}
public List<PipeInfo> getAllPipeInfos() {
@@ -274,19 +275,19 @@ public class SyncService implements IService {
private void checkRunningPipeExistAndName(String pipeName) throws PipeException {
if (runningPipe == null || runningPipe.getStatus() == PipeStatus.DROP) {
- throw new PipeException("There is no existing pipe.");
+ throw new PipeException("There is no existing PIPE.");
}
if (!runningPipe.getName().equals(pipeName)) {
throw new PipeException(
String.format(
- "Pipe %s is %s, please retry after drop it.",
+ "PIPE %s is %s, please retry after drop it.",
runningPipe.getName(), runningPipe.getStatus()));
}
}
public synchronized void recordMessage(PipeMessage message) {
if (runningPipe == null || runningPipe.getStatus() == PipeStatus.DROP) {
- logger.info(String.format("No running pipe for message %s.", message));
+ logger.info(String.format("No running PIPE for message %s.", message));
return;
}
TSStatus status = null;
@@ -300,7 +301,7 @@ public class SyncService implements IService {
} catch (PipeException e) {
logger.error(
String.format(
- "Stop pipe %s when meeting error in sender service.", runningPipe.getName()),
+ "Stop PIPE %s when meeting error in sender service.", runningPipe.getName()),
e);
}
break;
@@ -317,14 +318,14 @@ public class SyncService implements IService {
}
}
- public List<TPipeInfo> showPipe(String pipeName) {
+ public List<TShowPipeInfo> showPipe(String pipeName) {
boolean showAll = StringUtils.isEmpty(pipeName);
- List<TPipeInfo> list = new ArrayList<>();
+ List<TShowPipeInfo> list = new ArrayList<>();
// show pipe in sender
for (PipeInfo pipe : SyncService.getInstance().getAllPipeInfos()) {
if (showAll || pipeName.equals(pipe.getPipeName())) {
- TPipeInfo tPipeInfo =
- new TPipeInfo(
+ TShowPipeInfo tPipeInfo =
+ new TShowPipeInfo(
pipe.getCreateTime(),
pipe.getPipeName(),
SyncConstant.ROLE_SENDER,
@@ -337,8 +338,8 @@ public class SyncService implements IService {
// show pipe in receiver
for (TSyncIdentityInfo identityInfo : receiverManager.getAllTSyncIdentityInfos()) {
if (showAll || pipeName.equals(identityInfo.getPipeName())) {
- TPipeInfo tPipeInfo =
- new TPipeInfo(
+ TShowPipeInfo tPipeInfo =
+ new TShowPipeInfo(
identityInfo.getCreateTime(),
identityInfo.getPipeName(),
SyncConstant.ROLE_RECEIVER,
@@ -358,37 +359,42 @@ public class SyncService implements IService {
// show pipe in sender
for (PipeInfo pipe : SyncService.getInstance().getAllPipeInfos()) {
if (showAll || plan.getPipeName().equals(pipe.getPipeName())) {
- RowRecord record = new RowRecord(0);
- record.addField(
- Binary.valueOf(DatetimeUtils.convertLongToDate(pipe.getCreateTime())), TSDataType.TEXT);
- record.addField(Binary.valueOf(pipe.getPipeName()), TSDataType.TEXT);
- record.addField(Binary.valueOf(IoTDBConstant.SYNC_SENDER_ROLE), TSDataType.TEXT);
- record.addField(Binary.valueOf(pipe.getPipeSinkName()), TSDataType.TEXT);
- record.addField(Binary.valueOf(pipe.getStatus().name()), TSDataType.TEXT);
- PipeSink pipeSink = syncInfoFetcher.getPipeSink(pipe.getPipeSinkName());
- if (pipeSink.getType() == PipeSink.PipeSinkType.ExternalPipe) { // for external pipe
- ExtPipePluginManager extPipePluginManager =
- SyncService.getInstance().getExternalPipeManager();
-
- if (extPipePluginManager != null) {
- String extPipeType = ((ExternalPipeSink) pipeSink).getExtPipeSinkTypeName();
- ExternalPipeStatus externalPipeStatus =
- extPipePluginManager.getExternalPipeStatus(extPipeType);
-
- // TODO(ext-pipe): Adapting to the new syntax of SHOW PIPE
- if (externalPipeStatus != null) {
- record.addField(
- Binary.valueOf(
- externalPipeStatus.getWriterInvocationFailures().toString()
- + ";"
- + externalPipeStatus.getWriterStatuses().toString()),
- TSDataType.TEXT);
+ try {
+ RowRecord record = new RowRecord(0);
+ record.addField(
+ Binary.valueOf(DatetimeUtils.convertLongToDate(pipe.getCreateTime())),
+ TSDataType.TEXT);
+ record.addField(Binary.valueOf(pipe.getPipeName()), TSDataType.TEXT);
+ record.addField(Binary.valueOf(IoTDBConstant.SYNC_SENDER_ROLE), TSDataType.TEXT);
+ record.addField(Binary.valueOf(pipe.getPipeSinkName()), TSDataType.TEXT);
+ record.addField(Binary.valueOf(pipe.getStatus().name()), TSDataType.TEXT);
+ PipeSink pipeSink = syncInfoFetcher.getPipeSink(pipe.getPipeSinkName());
+ if (pipeSink.getType() == PipeSink.PipeSinkType.ExternalPipe) { // for external pipe
+ ExtPipePluginManager extPipePluginManager =
+ SyncService.getInstance().getExternalPipeManager();
+
+ if (extPipePluginManager != null) {
+ String extPipeType = ((ExternalPipeSink) pipeSink).getExtPipeSinkTypeName();
+ ExternalPipeStatus externalPipeStatus =
+ extPipePluginManager.getExternalPipeStatus(extPipeType);
+
+ // TODO(ext-pipe): Adapting to the new syntax of SHOW PIPE
+ if (externalPipeStatus != null) {
+ record.addField(
+ Binary.valueOf(
+ externalPipeStatus.getWriterInvocationFailures().toString()
+ + ";"
+ + externalPipeStatus.getWriterStatuses().toString()),
+ TSDataType.TEXT);
+ }
}
+ } else {
+ record.addField(Binary.valueOf(pipe.getMessageType().name()), TSDataType.TEXT);
}
- } else {
- record.addField(Binary.valueOf(pipe.getMessageType().name()), TSDataType.TEXT);
+ listDataSet.putRecord(record);
+ } catch (Exception e) {
+ logger.error("failed to show pipe [{}] because {}", pipe.getPipeName(), e.getMessage());
}
- listDataSet.putRecord(record);
}
}
// show pipe in receiver
@@ -489,7 +495,7 @@ public class SyncService implements IService {
runningPipe.close();
} catch (PipeException e) {
logger.warn(
- String.format("Stop pipe %s error when stop Sender Service.", runningPipe.getName()),
+ String.format("Stop PIPE %s error when stop Sender Service.", runningPipe.getName()),
e);
}
}
@@ -516,13 +522,13 @@ public class SyncService implements IService {
return ServiceType.SYNC_SERVICE;
}
- private void recover() throws IOException, PipeException, StartupException {
+ private void recover() throws IOException, PipeException, PipeSinkException {
PipeInfo runningPipeInfo = syncInfoFetcher.getRunningPipeInfo();
if (runningPipeInfo == null || PipeStatus.DROP.equals(runningPipeInfo.getStatus())) {
return;
} else {
this.runningPipe =
- SyncPipeUtil.parsePipeInfoAsPipe(
+ SyncPipeUtil.parseTPipeSinkInfoAsPipeSink(
runningPipeInfo, syncInfoFetcher.getPipeSink(runningPipeInfo.getPipeSinkName()));
switch (runningPipeInfo.getStatus()) {
case RUNNING:
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
new file mode 100644
index 0000000000..b21c6a23f6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.common;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.consensus.PartitionRegionId;
+import org.apache.iotdb.commons.exception.sync.PipeSinkException;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeMessage;
+import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
+import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
+import org.apache.iotdb.rpc.RpcUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/** Only fetch read request. For write request, return SUCCESS directly. */
+public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClusterSyncInfoFetcher.class);
+
+ private static final IClientManager<PartitionRegionId, ConfigNodeClient>
+ CONFIG_NODE_CLIENT_MANAGER =
+ new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>()
+ .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+
+ // region Interfaces of PipeSink
+
+ @Override
+ public TSStatus addPipeSink(CreatePipeSinkPlan plan) {
+ return RpcUtils.SUCCESS_STATUS;
+ }
+
+ @Override
+ public TSStatus addPipeSink(CreatePipeSinkStatement createPipeSinkStatement) {
+ return RpcUtils.SUCCESS_STATUS;
+ }
+
+ @Override
+ public TSStatus dropPipeSink(String name) {
+ return RpcUtils.SUCCESS_STATUS;
+ }
+
+ @Override
+ public PipeSink getPipeSink(String name) throws PipeSinkException {
+ try (ConfigNodeClient configNodeClient =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ TGetPipeSinkReq tGetPipeSinkReq = new TGetPipeSinkReq().setPipeSinkName(name);
+ TGetPipeSinkResp resp = configNodeClient.getPipeSink(tGetPipeSinkReq);
+ if (resp.getPipeSinkInfoList().isEmpty()) {
+ throw new PipeSinkException(
+ String.format("Failed to getPipeSink [%s] because it does not exist.", name));
+ }
+ return SyncPipeUtil.parseTPipeSinkInfoAsPipeSink(resp.getPipeSinkInfoList().get(0));
+ } catch (Exception e) {
+ LOGGER.error("Get PipeSink [{}] error because {}", name, e.getMessage(), e);
+ throw new PipeSinkException(e.getMessage());
+ }
+ }
+
+ @Override
+ public List<PipeSink> getAllPipeSinks() {
+ throw new UnsupportedOperationException();
+ }
+
+ // endregion
+
+ // region Interfaces of Pipe
+
+ @Override
+ public TSStatus addPipe(PipeInfo pipeInfo) {
+ return RpcUtils.SUCCESS_STATUS;
+ }
+
+ @Override
+ public TSStatus stopPipe(String pipeName) {
+ return RpcUtils.SUCCESS_STATUS;
+ }
+
+ @Override
+ public TSStatus startPipe(String pipeName) {
+ return RpcUtils.SUCCESS_STATUS;
+ }
+
+ @Override
+ public TSStatus dropPipe(String pipeName) {
+ return RpcUtils.SUCCESS_STATUS;
+ }
+
+ @Override
+ public List<PipeInfo> getAllPipeInfos() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PipeInfo getRunningPipeInfo() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TSStatus recordMsg(String pipeName, long createTime, PipeMessage message) {
+ return null;
+ }
+
+ // endregion
+
+ // region singleton
+ private static class ClusterSyncInfoFetcherHolder {
+ private static final ClusterSyncInfoFetcher INSTANCE = new ClusterSyncInfoFetcher();
+
+ private ClusterSyncInfoFetcherHolder() {}
+ }
+
+ public static ClusterSyncInfoFetcher getInstance() {
+ return ClusterSyncInfoFetcher.ClusterSyncInfoFetcherHolder.INSTANCE;
+ }
+ // endregion
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
index 5266c16b4e..468456dc23 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
@@ -19,12 +19,11 @@
package org.apache.iotdb.db.sync.common;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.sync.PipeSinkException;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import java.util.List;
@@ -39,17 +38,14 @@ public interface ISyncInfoFetcher {
TSStatus dropPipeSink(String name);
- PipeSink getPipeSink(String name);
+ PipeSink getPipeSink(String name) throws PipeSinkException;
List<PipeSink> getAllPipeSinks();
// endregion
// region Interfaces of Pipe
- // TODO(sync): delete this in new-standalone version
- TSStatus addPipe(CreatePipePlan plan, long createTime);
-
- TSStatus addPipe(CreatePipeStatement createPipeStatement, long createTime);
+ TSStatus addPipe(PipeInfo pipeInfo);
TSStatus stopPipe(String pipeName);
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
index a90a67ba9a..ed56cd2520 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
@@ -25,12 +25,11 @@ import org.apache.iotdb.commons.sync.persistence.SyncLogReader;
import org.apache.iotdb.commons.sync.persistence.SyncLogWriter;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeMessage;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
import org.apache.iotdb.commons.sync.pipe.SyncOperation;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
@@ -105,33 +104,29 @@ public class LocalSyncInfo {
// endregion
// region Implement of Pipe
- // TODO: delete this in new-standalone version
- public void addPipe(CreatePipePlan plan, long createTime) throws PipeException, IOException {
- if (!syncMetadata.isPipeSinkExist(plan.getPipeSinkName())) {
- throw new PipeException(String.format("Can not find pipeSink %s.", plan.getPipeSinkName()));
- }
- PipeSink pipeSink = getPipeSink(plan.getPipeSinkName());
- PipeInfo pipeInfo = SyncPipeUtil.parseCreatePipePlanAsPipeInfo(plan, pipeSink, createTime);
- syncMetadata.addPipe(pipeInfo, pipeSink);
- syncLogWriter.addPipe(pipeInfo);
- }
- public void addPipe(CreatePipeStatement createPipeStatement, long createTime)
- throws PipeException, IOException {
- if (!syncMetadata.isPipeSinkExist(createPipeStatement.getPipeSinkName())) {
- throw new PipeException(
- String.format("Can not find pipeSink %s.", createPipeStatement.getPipeSinkName()));
- }
- PipeSink pipeSink = getPipeSink(createPipeStatement.getPipeSinkName());
- PipeInfo pipeInfo =
- SyncPipeUtil.parseCreatePipePlanAsPipeInfo(createPipeStatement, pipeSink, createTime);
- syncMetadata.addPipe(pipeInfo, pipeSink);
+ public void addPipe(PipeInfo pipeInfo) throws PipeException, IOException {
+ syncMetadata.checkAddPipe(pipeInfo);
+ syncMetadata.addPipe(pipeInfo);
syncLogWriter.addPipe(pipeInfo);
}
public void operatePipe(String pipeName, SyncOperation syncOperation)
throws PipeException, IOException {
- syncMetadata.operatePipe(pipeName, syncOperation);
+ syncMetadata.checkIfPipeExist(pipeName);
+ switch (syncOperation) {
+ case START_PIPE:
+ syncMetadata.setPipeStatus(pipeName, PipeStatus.RUNNING);
+ break;
+ case STOP_PIPE:
+ syncMetadata.setPipeStatus(pipeName, PipeStatus.STOP);
+ break;
+ case DROP_PIPE:
+ syncMetadata.setPipeStatus(pipeName, PipeStatus.DROP);
+ break;
+ default:
+ throw new PipeException("Unknown operatorType " + syncOperation);
+ }
syncLogWriter.operatePipe(pipeName, syncOperation);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
index 0a6700f5bc..1e13ffa9f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
@@ -27,8 +27,6 @@ import org.apache.iotdb.commons.sync.pipe.SyncOperation;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -94,19 +92,9 @@ public class LocalSyncInfoFetcher implements ISyncInfoFetcher {
// region Implement of Pipe
@Override
- public TSStatus addPipe(CreatePipePlan plan, long createTime) {
+ public TSStatus addPipe(PipeInfo pipeInfo) {
try {
- localSyncInfo.addPipe(plan, createTime);
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- } catch (PipeException | IOException e) {
- return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
- }
- }
-
- @Override
- public TSStatus addPipe(CreatePipeStatement createPipeStatement, long createTime) {
- try {
- localSyncInfo.addPipe(createPipeStatement, createTime);
+ localSyncInfo.addPipe(pipeInfo);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (PipeException e) {
return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, e.getMessage());
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java
index b926ac1bca..21c81e892b 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -132,6 +133,13 @@ public class ExternalPipeSink implements PipeSink {
sinkParams = ReadWriteIOUtils.readMap(inputStream);
}
+ @Override
+ public void deserialize(ByteBuffer buffer) {
+ pipeSinkName = ReadWriteIOUtils.readString(buffer);
+ extPipeSinkTypeName = ReadWriteIOUtils.readString(buffer);
+ sinkParams = ReadWriteIOUtils.readMap(buffer);
+ }
+
public Map<String, String> getSinkParams() {
return sinkParams;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java b/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
index 9011e1c480..af7b06cde0 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
import org.apache.iotdb.commons.sync.pipesink.IoTDBPipeSink;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
@@ -67,47 +68,9 @@ public class SyncPipeUtil {
}
// TODO(sync): delete this in new-standalone version
- public static Pipe parseCreatePipePlanAsPipe(
- CreatePipePlan plan, PipeSink pipeSink, long pipeCreateTime) throws PipeException {
- boolean syncDelOp = true;
- for (Pair<String, String> pair : plan.getPipeAttributes()) {
- pair.left = pair.left.toLowerCase();
- if ("syncdelop".equals(pair.left)) {
- syncDelOp = Boolean.parseBoolean(pair.right);
- } else {
- throw new PipeException(String.format("Can not recognition attribute %s", pair.left));
- }
- }
-
- return new TsFilePipe(
- pipeCreateTime, plan.getPipeName(), pipeSink, plan.getDataStartTimestamp(), syncDelOp);
- }
-
- public static Pipe parseCreatePipePlanAsPipe(
- CreatePipeStatement createPipeStatement, PipeSink pipeSink, long pipeCreateTime)
+ public static PipeInfo parseCreatePipePlanAsPipeInfo(CreatePipePlan plan, long pipeCreateTime)
throws PipeException {
boolean syncDelOp = true;
- for (Map.Entry<String, String> entry : createPipeStatement.getPipeAttributes().entrySet()) {
- String attributeKey = entry.getKey().toLowerCase();
- if ("syncdelop".equals(attributeKey)) {
- syncDelOp = Boolean.parseBoolean(entry.getValue());
- } else {
- throw new PipeException(String.format("Can not recognition attribute %s", entry.getKey()));
- }
- }
-
- return new TsFilePipe(
- pipeCreateTime,
- createPipeStatement.getPipeName(),
- pipeSink,
- createPipeStatement.getStartTime(),
- syncDelOp);
- }
-
- // TODO(sync): delete this in new-standalone version
- public static PipeInfo parseCreatePipePlanAsPipeInfo(
- CreatePipePlan plan, PipeSink pipeSink, long pipeCreateTime) throws PipeException {
- boolean syncDelOp = true;
for (Pair<String, String> pair : plan.getPipeAttributes()) {
pair.left = pair.left.toLowerCase();
if ("syncdelop".equals(pair.left)) {
@@ -119,15 +82,14 @@ public class SyncPipeUtil {
return new TsFilePipeInfo(
plan.getPipeName(),
- pipeSink.getPipeSinkName(),
+ plan.getPipeSinkName(),
pipeCreateTime,
plan.getDataStartTimestamp(),
syncDelOp);
}
public static PipeInfo parseCreatePipePlanAsPipeInfo(
- CreatePipeStatement createPipeStatement, PipeSink pipeSink, long pipeCreateTime)
- throws PipeException {
+ CreatePipeStatement createPipeStatement, long pipeCreateTime) throws PipeException {
boolean syncDelOp = true;
for (Map.Entry<String, String> entry : createPipeStatement.getPipeAttributes().entrySet()) {
String attributeKey = entry.getKey().toLowerCase();
@@ -140,14 +102,14 @@ public class SyncPipeUtil {
return new TsFilePipeInfo(
createPipeStatement.getPipeName(),
- pipeSink.getPipeSinkName(),
+ createPipeStatement.getPipeSinkName(),
pipeCreateTime,
createPipeStatement.getStartTime(),
syncDelOp);
}
/** parse PipeInfo to Pipe, ignore status */
- public static Pipe parsePipeInfoAsPipe(PipeInfo pipeInfo, PipeSink pipeSink)
+ public static Pipe parseTPipeSinkInfoAsPipeSink(PipeInfo pipeInfo, PipeSink pipeSink)
throws PipeException {
if (pipeInfo instanceof TsFilePipeInfo) {
return new TsFilePipe(
@@ -162,7 +124,8 @@ public class SyncPipeUtil {
}
/** parse TPipeSinkInfo to PipeSink */
- public static PipeSink parsePipeInfoAsPipe(TPipeSinkInfo pipeSinkInfo) throws PipeSinkException {
+ public static PipeSink parseTPipeSinkInfoAsPipeSink(TPipeSinkInfo pipeSinkInfo)
+ throws PipeSinkException {
if (pipeSinkInfo.getPipeSinkType().equals(PipeSink.PipeSinkType.IoTDB.name())) {
PipeSink pipeSink = new IoTDBPipeSink(pipeSinkInfo.getPipeSinkName());
pipeSink.setAttribute(pipeSinkInfo.getAttributes());
@@ -172,4 +135,25 @@ public class SyncPipeUtil {
throw new UnsupportedOperationException();
}
}
+
+ /** parse TPipeInfo to PipeInfo */
+ public static PipeInfo parseTPipeInfoAsPipeInfo(TPipeInfo pipeInfo, long pipeCreateTime)
+ throws PipeException {
+ boolean syncDelOp = true;
+ for (Map.Entry<String, String> entry : pipeInfo.getAttributes().entrySet()) {
+ String attributeKey = entry.getKey().toLowerCase();
+ if ("syncdelop".equals(attributeKey)) {
+ syncDelOp = Boolean.parseBoolean(entry.getValue());
+ } else {
+ throw new PipeException(String.format("Can not recognition attribute %s", entry.getKey()));
+ }
+ }
+
+ return new TsFilePipeInfo(
+ pipeInfo.getPipeName(),
+ pipeInfo.getPipeSinkName(),
+ pipeCreateTime,
+ pipeInfo.getStartTime(),
+ syncDelOp);
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/LocalSyncInfoTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/LocalSyncInfoTest.java
index 878835691e..5bb762befb 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/LocalSyncInfoTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/LocalSyncInfoTest.java
@@ -22,8 +22,8 @@ import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.sync.common.LocalSyncInfo;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -59,21 +59,21 @@ public class LocalSyncInfoTest {
createPipeSinkPlan.addPipeSinkAttribute("ip", "127.0.0.1");
createPipeSinkPlan.addPipeSinkAttribute("port", "6670");
try {
- localSyncInfo.addPipe(new CreatePipePlan(pipe1, "demo"), createdTime1);
+ localSyncInfo.addPipe(new TsFilePipeInfo(pipe1, "demo", createdTime1, 0, true));
Assert.fail();
} catch (PipeException e) {
// throw exception because can not find pipeSink
}
localSyncInfo.addPipeSink(createPipeSinkPlan);
- localSyncInfo.addPipe(new CreatePipePlan(pipe1, "demo"), createdTime1);
+ localSyncInfo.addPipe(new TsFilePipeInfo(pipe1, "demo", createdTime1, 0, true));
try {
- localSyncInfo.addPipe(new CreatePipePlan(pipe2, "demo"), createdTime2);
+ localSyncInfo.addPipe(new TsFilePipeInfo(pipe2, "demo", createdTime2, 0, true));
Assert.fail();
} catch (PipeException e) {
// throw exception because only one pipe is allowed now
}
localSyncInfo.operatePipe(pipe1, SyncOperation.DROP_PIPE);
- localSyncInfo.addPipe(new CreatePipePlan(pipe2, "demo"), createdTime2);
+ localSyncInfo.addPipe(new TsFilePipeInfo(pipe2, "demo", createdTime2, 0, true));
localSyncInfo.operatePipe(pipe2, SyncOperation.STOP_PIPE);
localSyncInfo.operatePipe(pipe2, SyncOperation.START_PIPE);
Assert.assertEquals(1, localSyncInfo.getAllPipeSink().size());
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index d930bde8c8..4f4fc495d5 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -437,7 +437,7 @@ struct TGetPathsSetTemplatesResp {
}
// SYNC
-struct TPipeInfo {
+struct TShowPipeInfo {
1: required i64 createTime
2: required string pipeName
3: required string role
@@ -446,6 +446,13 @@ struct TPipeInfo {
6: required string message
}
+struct TPipeInfo {
+ 1: required string pipeName
+ 2: required string pipeSinkName
+ 3: required i64 startTime
+ 4: optional map<string, string> attributes
+}
+
struct TPipeSinkInfo {
1: required string pipeSinkName
2: required string pipeSinkType
@@ -465,9 +472,13 @@ struct TGetPipeSinkResp {
2: required list<TPipeSinkInfo> pipeSinkInfoList
}
+struct TShowPipeReq {
+ 1: optional string pipeName
+}
+
struct TShowPipeResp {
1: required common.TSStatus status
- 2: optional list<TPipeInfo> pipeInfoList
+ 2: optional list<TShowPipeInfo> pipeInfoList
}
struct TDeleteTimeSeriesReq{
@@ -828,6 +839,21 @@ service IConfigNodeRPCService {
/** Get PipeSink by name, if name is empty, get all PipeSink */
TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req)
+ /** Create Pipe */
+ common.TSStatus createPipe(TPipeInfo req)
+
+ /** Start Pipe */
+ common.TSStatus startPipe(string pipeName)
+
+ /** Stop Pipe */
+ common.TSStatus stopPipe(string pipeName)
+
+ /** Drop Pipe */
+ common.TSStatus dropPipe(string pipeName)
+
+ /** Show Pipe by name, if name is empty, show all Pipe */
+ TShowPipeResp showPipe(TShowPipeReq req)
+
// ======================================================
// TestTools
// ======================================================
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 60cb2a2d43..cf35dd7faf 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -280,6 +280,16 @@ struct TDeleteTimeSeriesReq{
2: required binary pathPatternTree
}
+struct TCreatePipeOnDataNodeReq{
+ 1: required binary pipeInfo
+}
+
+struct TOperatePipeOnDataNodeReq {
+ 1: required string pipeName
+ // ordinal of {@linkplain SyncOperation}
+ 2: required i8 operation
+}
+
service IDataNodeRPCService {
// -----------------------------------For Data Node-----------------------------------------------
@@ -519,6 +529,16 @@ service IDataNodeRPCService {
* Delete matched timeseries and remove according schema black list in target schemRegion
*/
common.TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req)
+
+ /**
+ * Create PIPE on DataNode
+ */
+ common.TSStatus createPipeOnDataNode(TCreatePipeOnDataNodeReq req)
+
+ /**
+ * Start, stop or drop PIPE on DataNode
+ */
+ common.TSStatus operatePipeOnDataNode(TOperatePipeOnDataNodeReq req)
}
service MPPDataExchangeService {