You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/06 18:09:57 UTC

[iotdb] 02/03: createPipe impl (TBD)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 15f62d4efe759856fb2a517e840d7cfd9c028843
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Fri Apr 28 19:23:18 2023 +0800

    createPipe impl (TBD)
---
 .../commons/pipe/task/meta/PipeStaticMeta.java     | 18 ++++++++--------
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 24 +++++++++++++++++++++-
 2 files changed, 32 insertions(+), 10 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index 3b9c94c94ce..d04549c4c37 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -32,7 +32,7 @@ import java.util.Map;
 public class PipeStaticMeta {
 
   private String pipeName;
-  private long createTime;
+  private long creationTime;
 
   private Map<String, String> collectorAttributes = new HashMap<>();
   private Map<String, String> processorAttributes = new HashMap<>();
@@ -46,12 +46,12 @@ public class PipeStaticMeta {
 
   public PipeStaticMeta(
       String pipeName,
-      long createTime,
+      long creationTime,
       Map<String, String> collectorAttributes,
       Map<String, String> processorAttributes,
       Map<String, String> connectorAttributes) {
     this.pipeName = pipeName.toUpperCase();
-    this.createTime = createTime;
+    this.creationTime = creationTime;
     this.collectorAttributes = collectorAttributes;
     this.processorAttributes = processorAttributes;
     this.connectorAttributes = connectorAttributes;
@@ -64,8 +64,8 @@ public class PipeStaticMeta {
     return pipeName;
   }
 
-  public long getCreateTime() {
-    return createTime;
+  public long getCreationTime() {
+    return creationTime;
   }
 
   public PipeParameters getCollectorParameters() {
@@ -89,7 +89,7 @@ public class PipeStaticMeta {
 
   public void serialize(DataOutputStream outputStream) throws IOException {
     ReadWriteIOUtils.write(pipeName, outputStream);
-    ReadWriteIOUtils.write(createTime, outputStream);
+    ReadWriteIOUtils.write(creationTime, outputStream);
 
     outputStream.writeInt(collectorAttributes.size());
     for (Map.Entry<String, String> entry : collectorAttributes.entrySet()) {
@@ -117,7 +117,7 @@ public class PipeStaticMeta {
     final PipeStaticMeta pipeStaticMeta = new PipeStaticMeta();
 
     pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(byteBuffer);
-    pipeStaticMeta.createTime = ReadWriteIOUtils.readLong(byteBuffer);
+    pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(byteBuffer);
 
     int size = byteBuffer.getInt();
     for (int i = 0; i < size; ++i) {
@@ -152,7 +152,7 @@ public class PipeStaticMeta {
     }
     PipeStaticMeta that = (PipeStaticMeta) obj;
     return pipeName.equals(that.pipeName)
-        && createTime == that.createTime
+        && creationTime == that.creationTime
         && collectorAttributes.equals(that.collectorAttributes)
         && processorAttributes.equals(that.processorAttributes)
         && connectorAttributes.equals(that.connectorAttributes);
@@ -170,7 +170,7 @@ public class PipeStaticMeta {
         + pipeName
         + '\''
         + ", createTime="
-        + createTime
+        + creationTime
         + ", collectorAttributes="
         + collectorAttributes
         + ", processorAttributes="
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index eb0ca8f6f26..40a79cd3f26 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -23,9 +23,13 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PipeTaskAgent {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskAgent.class);
+
   private final PipeMetaKeeper pipeMetaKeeper;
 
   public PipeTaskAgent() {
@@ -34,7 +38,25 @@ public class PipeTaskAgent {
 
   ////////////////////////// Pipe Task Management //////////////////////////
 
-  public void createPipe(PipeMeta pipeMeta) {}
+  public void createPipe(PipeMeta pipeMeta) {
+    final String pipeName = pipeMeta.getStaticMeta().getPipeName();
+    final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
+
+    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+    if (existedPipeMeta != null) {
+      if (existedPipeMeta.getStaticMeta().getCreationTime() == creationTime) {
+        switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+          case STOPPED:
+          case RUNNING:
+          case DROPPED:
+          default:
+        }
+        return;
+      }
+
+      dropPipe(pipeName, existedPipeMeta.getStaticMeta().getCreationTime());
+    }
+  }
 
   public void createPipeTaskByConsensusGroup(
       String pipeName,