You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/13 20:57:36 UTC

[iotdb] branch master updated: [IOTDB-5799] Pipe: Support `SHOW PIPES` clause (#9838)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 34fb7f532f6 [IOTDB-5799] Pipe: Support `SHOW PIPES` clause (#9838)
34fb7f532f6 is described below

commit 34fb7f532f6692488bbaf0c1f78625c6e671f770
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Sun May 14 04:57:28 2023 +0800

    [IOTDB-5799] Pipe: Support `SHOW PIPES` clause (#9838)
    
    Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
 .../consensus/request/ConfigPhysicalPlan.java      |  4 ++
 .../consensus/request/ConfigPhysicalPlanType.java  |  1 +
 .../request/read/pipe/task/ShowPipePlanV2.java     | 42 +++++++++++++++++++
 .../response/pipe/task/PipeTableResp.java          | 49 ++++++++++++++++++++++
 .../manager/pipe/PipeTaskCoordinator.java          | 23 +++++++++-
 .../persistence/executor/ConfigPlanExecutor.java   |  2 +
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 10 +++++
 .../src/main/thrift/confignode.thrift              |  2 +-
 8 files changed, 131 insertions(+), 2 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index b5065d7cff2..2991f05ea65 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlo
 import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan;
 import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginJarPlan;
 import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginTablePlan;
+import org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2;
 import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan;
 import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
 import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
@@ -389,6 +390,9 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
         case DropPipeV2:
           plan = new DropPipePlanV2();
           break;
+        case ShowPipeV2:
+          plan = new ShowPipePlanV2();
+          break;
         case PipeHandleLeaderChange:
           plan = new PipeHandleLeaderChangePlan();
           break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index 5977309515d..d36a9973cd5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -178,6 +178,7 @@ public enum ConfigPhysicalPlanType {
   /** START PIPE & STOP PIPE */
   SetPipeStatusV2((short) 1501),
   DropPipeV2((short) 1502),
+  ShowPipeV2((short) 1503),
 
   /** Pipe Runtime */
   PipeHandleLeaderChange((short) 1600),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/pipe/task/ShowPipePlanV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/pipe/task/ShowPipePlanV2.java
new file mode 100644
index 00000000000..8c11fb6f03a
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/pipe/task/ShowPipePlanV2.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.read.pipe.task;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class ShowPipePlanV2 extends ConfigPhysicalPlan {
+
+  public ShowPipePlanV2() {
+    super(ConfigPhysicalPlanType.ShowPipeV2);
+  }
+
+  @Override
+  protected void serializeImpl(DataOutputStream stream) throws IOException {
+    stream.writeShort(getType().getPlanType());
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {}
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
new file mode 100644
index 00000000000..a0416ede651
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.response.pipe.task;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
+import org.apache.iotdb.consensus.common.DataSet;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class PipeTableResp implements DataSet {
+
+  private final TSStatus status;
+  private final List<PipeMeta> allPipeMeta;
+
+  public PipeTableResp(TSStatus status, List<PipeMeta> allPipeMeta) {
+    this.status = status;
+    this.allPipeMeta = allPipeMeta;
+  }
+
+  public TGetAllPipeInfoResp convertToThriftResponse() throws IOException {
+    final List<ByteBuffer> pipeInformationByteBuffers = new ArrayList<>();
+    for (PipeMeta pipeMeta : allPipeMeta) {
+      pipeInformationByteBuffers.add(pipeMeta.serialize());
+    }
+    return new TGetAllPipeInfoResp(status, pipeInformationByteBuffers);
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
index 3c768c9206d..c99bd7d1a04 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
@@ -19,14 +19,25 @@
 package org.apache.iotdb.confignode.manager.pipe;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2;
+import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
 
 public class PipeTaskCoordinator {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskCoordinator.class);
+
   private final ConfigManager configManager;
   private final PipeTaskInfo pipeTaskInfo;
 
@@ -64,7 +75,17 @@ public class PipeTaskCoordinator {
   }
 
   public TGetAllPipeInfoResp showPipes() {
-    throw new UnsupportedOperationException("Not implemented yet");
+    try {
+      return ((PipeTableResp)
+              configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset())
+          .convertToThriftResponse();
+    } catch (IOException e) {
+      LOGGER.error("Fail to get AllPipeInfo", e);
+      return new TGetAllPipeInfoResp(
+          new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+              .setMessage(e.getMessage()),
+          Collections.emptyList());
+    }
   }
 
   public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index abd02156f2c..2f812b173b2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -282,6 +282,8 @@ public class ConfigPlanExecutor {
         return pipeInfo.getPipePluginInfo().showPipePlugins();
       case GetPipePluginJar:
         return pipeInfo.getPipePluginInfo().getPipePluginJar((GetPipePluginJarPlan) req);
+      case ShowPipeV2:
+        return pipeInfo.getPipeTaskInfo().showPipes();
       default:
         throw new UnknownPhysicalPlanTypeException(req.getType());
     }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 64f4b23100a..542e6bc6824 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -29,7 +29,9 @@ import org.apache.iotdb.confignode.consensus.request.write.pipe.coordinator.Pipe
 import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -41,6 +43,8 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 public class PipeTaskInfo implements SnapshotProcessor {
 
@@ -162,6 +166,12 @@ public class PipeTaskInfo implements SnapshotProcessor {
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
+  public DataSet showPipes() {
+    return new PipeTableResp(
+        new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+        StreamSupport.stream(getPipeMetaList().spliterator(), false).collect(Collectors.toList()));
+  }
+
   public Iterable<PipeMeta> getPipeMetaList() {
     return pipeMetaKeeper.getPipeMetaList();
   }
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 8ae61f53535..8fa78991448 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -615,7 +615,7 @@ struct TShowPipeInfo {
 
 struct TGetAllPipeInfoResp{
   1: required common.TSStatus status
-  2: optional list<binary> allPipeInfo
+  2: required list<binary> allPipeInfo
 }
 
 struct TCreatePipeReq {