You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/13 20:57:36 UTC
[iotdb] branch master updated: [IOTDB-5799] Pipe: Support `SHOW PIPES` clause (#9838)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 34fb7f532f6 [IOTDB-5799] Pipe: Support `SHOW PIPES` clause (#9838)
34fb7f532f6 is described below
commit 34fb7f532f6692488bbaf0c1f78625c6e671f770
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Sun May 14 04:57:28 2023 +0800
[IOTDB-5799] Pipe: Support `SHOW PIPES` clause (#9838)
Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
.../consensus/request/ConfigPhysicalPlan.java | 4 ++
.../consensus/request/ConfigPhysicalPlanType.java | 1 +
.../request/read/pipe/task/ShowPipePlanV2.java | 42 +++++++++++++++++++
.../response/pipe/task/PipeTableResp.java | 49 ++++++++++++++++++++++
.../manager/pipe/PipeTaskCoordinator.java | 23 +++++++++-
.../persistence/executor/ConfigPlanExecutor.java | 2 +
.../confignode/persistence/pipe/PipeTaskInfo.java | 10 +++++
.../src/main/thrift/confignode.thrift | 2 +-
8 files changed, 131 insertions(+), 2 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index b5065d7cff2..2991f05ea65 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlo
import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginTablePlan;
+import org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
@@ -389,6 +390,9 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
case DropPipeV2:
plan = new DropPipePlanV2();
break;
+ case ShowPipeV2:
+ plan = new ShowPipePlanV2();
+ break;
case PipeHandleLeaderChange:
plan = new PipeHandleLeaderChangePlan();
break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index 5977309515d..d36a9973cd5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -178,6 +178,7 @@ public enum ConfigPhysicalPlanType {
/** START PIPE & STOP PIPE */
SetPipeStatusV2((short) 1501),
DropPipeV2((short) 1502),
+ ShowPipeV2((short) 1503),
/** Pipe Runtime */
PipeHandleLeaderChange((short) 1600),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/pipe/task/ShowPipePlanV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/pipe/task/ShowPipePlanV2.java
new file mode 100644
index 00000000000..8c11fb6f03a
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/pipe/task/ShowPipePlanV2.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.read.pipe.task;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class ShowPipePlanV2 extends ConfigPhysicalPlan {
+
+ public ShowPipePlanV2() {
+ super(ConfigPhysicalPlanType.ShowPipeV2);
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeShort(getType().getPlanType());
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {}
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
new file mode 100644
index 00000000000..a0416ede651
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.response.pipe.task;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
+import org.apache.iotdb.consensus.common.DataSet;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class PipeTableResp implements DataSet {
+
+ private final TSStatus status;
+ private final List<PipeMeta> allPipeMeta;
+
+ public PipeTableResp(TSStatus status, List<PipeMeta> allPipeMeta) {
+ this.status = status;
+ this.allPipeMeta = allPipeMeta;
+ }
+
+ public TGetAllPipeInfoResp convertToThriftResponse() throws IOException {
+ final List<ByteBuffer> pipeInformationByteBuffers = new ArrayList<>();
+ for (PipeMeta pipeMeta : allPipeMeta) {
+ pipeInformationByteBuffers.add(pipeMeta.serialize());
+ }
+ return new TGetAllPipeInfoResp(status, pipeInformationByteBuffers);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
index 3c768c9206d..c99bd7d1a04 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
@@ -19,14 +19,25 @@
package org.apache.iotdb.confignode.manager.pipe;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2;
+import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
public class PipeTaskCoordinator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskCoordinator.class);
+
private final ConfigManager configManager;
private final PipeTaskInfo pipeTaskInfo;
@@ -64,7 +75,17 @@ public class PipeTaskCoordinator {
}
public TGetAllPipeInfoResp showPipes() {
- throw new UnsupportedOperationException("Not implemented yet");
+ try {
+ return ((PipeTableResp)
+ configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset())
+ .convertToThriftResponse();
+ } catch (IOException e) {
+ LOGGER.error("Fail to get AllPipeInfo", e);
+ return new TGetAllPipeInfoResp(
+ new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(e.getMessage()),
+ Collections.emptyList());
+ }
}
public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index abd02156f2c..2f812b173b2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -282,6 +282,8 @@ public class ConfigPlanExecutor {
return pipeInfo.getPipePluginInfo().showPipePlugins();
case GetPipePluginJar:
return pipeInfo.getPipePluginInfo().getPipePluginJar((GetPipePluginJarPlan) req);
+ case ShowPipeV2:
+ return pipeInfo.getPipeTaskInfo().showPipes();
default:
throw new UnknownPhysicalPlanTypeException(req.getType());
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 64f4b23100a..542e6bc6824 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -29,7 +29,9 @@ import org.apache.iotdb.confignode.consensus.request.write.pipe.coordinator.Pipe
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -41,6 +43,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
public class PipeTaskInfo implements SnapshotProcessor {
@@ -162,6 +166,12 @@ public class PipeTaskInfo implements SnapshotProcessor {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
+ public DataSet showPipes() {
+ return new PipeTableResp(
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+ StreamSupport.stream(getPipeMetaList().spliterator(), false).collect(Collectors.toList()));
+ }
+
public Iterable<PipeMeta> getPipeMetaList() {
return pipeMetaKeeper.getPipeMetaList();
}
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 8ae61f53535..8fa78991448 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -615,7 +615,7 @@ struct TShowPipeInfo {
struct TGetAllPipeInfoResp{
1: required common.TSStatus status
- 2: optional list<binary> allPipeInfo
+ 2: required list<binary> allPipeInfo
}
struct TCreatePipeReq {