You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/06 19:59:25 UTC

[iotdb] branch IOTDB-5787 updated (c16bc7f7a06 -> a9ca134f3b6)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from c16bc7f7a06 Merge branch 'master' of github.com:apache/iotdb into IOTDB-5787
     new df99304cfa4 DN: createPipe
     new e45834c0708 DN: dropPipe
     new a9ca134f3b6 DN: startPipe & stopPipe

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 185 ++++++++++++++++++++-
 1 file changed, 181 insertions(+), 4 deletions(-)


[iotdb] 01/03: DN: createPipe

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit df99304cfa408c6f3c8d32f1e761d39358a6503f
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun May 7 03:16:35 2023 +0800

    DN: createPipe
---
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 33 +++++++++++++++++++++-
 1 file changed, 32 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 40a79cd3f26..b27dfe97f4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -42,20 +42,51 @@ public class PipeTaskAgent {
     final String pipeName = pipeMeta.getStaticMeta().getPipeName();
     final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
 
+    // check if the pipe has already been created before
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
     if (existedPipeMeta != null) {
       if (existedPipeMeta.getStaticMeta().getCreationTime() == creationTime) {
         switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
           case STOPPED:
           case RUNNING:
+            LOGGER.info(
+                "Pipe {} (creation time = {}) has already been created. Current status = {}. Skip creating.",
+                pipeName,
+                creationTime,
+                existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+            return;
           case DROPPED:
+            LOGGER.info(
+                "Pipe {} (creation time = {}) has already been dropped. Current status = {}. Recreating.",
+                pipeName,
+                creationTime,
+                existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+            // break to drop the pipe meta and recreate it
+            break;
           default:
+            throw new IllegalStateException(
+                "Unexpected status: " + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
         }
-        return;
       }
 
+      // drop the pipe if
+      // 1. the pipe with the same name but with different creation time has been created before
+      // 2. the pipe with the same name and the same creation time has been dropped before, but the
+      //  pipe task meta has not been cleaned up
       dropPipe(pipeName, existedPipeMeta.getStaticMeta().getCreationTime());
     }
+
+    // build pipe task by consensus group
+    pipeMeta
+        .getRuntimeMeta()
+        .getConsensusGroupIdToTaskMetaMap()
+        .forEach(
+            ((consensusGroupId, pipeTaskMeta) -> {
+              createPipeTaskByConsensusGroup(
+                  pipeName, creationTime, consensusGroupId, pipeTaskMeta);
+            }));
+    // add pipe meta to pipe meta keeper
+    pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta);
   }
 
   public void createPipeTaskByConsensusGroup(


[iotdb] 02/03: DN: dropPipe

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e45834c0708311402fbed3b5f2cc2f6113cb4bf7
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun May 7 03:37:54 2023 +0800

    DN: dropPipe
---
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 38 +++++++++++++++++++++-
 1 file changed, 37 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index b27dfe97f4e..d99d891eb5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.agent.task;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,6 +87,8 @@ public class PipeTaskAgent {
                   pipeName, creationTime, consensusGroupId, pipeTaskMeta);
             }));
     // add pipe meta to pipe meta keeper
+    // note that we do not need to set the status of pipe meta here, because the status of pipe meta
+    // is already set to STOPPED when it is created
     pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta);
   }
 
@@ -95,7 +98,40 @@ public class PipeTaskAgent {
       TConsensusGroupId consensusGroupId,
       PipeTaskMeta pipeTaskMeta) {}
 
-  public void dropPipe(String pipeName, long creationTime) {}
+  public void dropPipe(String pipeName, long creationTime) {
+    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+    if (existedPipeMeta == null) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has already been dropped or has not been created. Skip dropping.",
+          pipeName,
+          creationTime);
+      return;
+    }
+    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has been created but does not match the creation time ({}) in dropPipe request. Skip dropping.",
+          pipeName,
+          existedPipeMeta.getStaticMeta().getCreationTime(),
+          creationTime);
+      return;
+    }
+
+    // mark pipe meta as dropped first. this will help us detect if the pipe meta has been dropped
+    // but the pipe task meta has not been cleaned up (in case of failure when executing
+    // dropPipeTaskByConsensusGroup).
+    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
+    // drop pipe task by consensus group
+    existedPipeMeta
+        .getRuntimeMeta()
+        .getConsensusGroupIdToTaskMetaMap()
+        .forEach(
+            ((consensusGroupId, pipeTaskMeta) -> {
+              dropPipeTaskByConsensusGroup(pipeName, creationTime, consensusGroupId);
+            }));
+    // remove pipe meta from pipe meta keeper
+    pipeMetaKeeper.removePipeMeta(pipeName);
+  }
 
   public void dropPipeTaskByConsensusGroup(
       String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}


[iotdb] 03/03: DN: startPipe & stopPipe

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a9ca134f3b646f3623d19cb8f27dee171cbaf279
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun May 7 03:50:20 2023 +0800

    DN: startPipe & stopPipe
---
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 116 ++++++++++++++++++++-
 1 file changed, 113 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index d99d891eb5f..fd401a325ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +44,6 @@ public class PipeTaskAgent {
     final String pipeName = pipeMeta.getStaticMeta().getPipeName();
     final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
 
-    // check if the pipe has already been created before
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
     if (existedPipeMeta != null) {
       if (existedPipeMeta.getStaticMeta().getCreationTime() == creationTime) {
@@ -136,12 +136,122 @@ public class PipeTaskAgent {
   public void dropPipeTaskByConsensusGroup(
       String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}
 
-  public void startPipe(String pipeName, long creationTime) {}
+  public void startPipe(String pipeName, long creationTime) {
+    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+    if (existedPipeMeta == null) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has already been dropped or has not been created. Skip starting.",
+          pipeName,
+          creationTime);
+      return;
+    }
+    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
+      throw new PipeManagementException(
+          String.format(
+              "Inconsistency between pipe meta and startPipe request detected. "
+                  + "Pipe %s (creation time = %d) has been created but does not match the creation time (%d) in startPipe request.",
+              pipeName, existedPipeMeta.getStaticMeta().getCreationTime(), creationTime));
+    }
+
+    switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+      case STOPPED:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has been created. Current status = {}. Starting.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        break;
+      case RUNNING:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has already been started. Current status = {}. Skip starting.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        return;
+      case DROPPED:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has already been dropped. Current status = {}. Skip starting.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        return;
+      default:
+        throw new IllegalStateException(
+            "Unexpected status: " + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+    }
+
+    // start pipe task by consensus group
+    existedPipeMeta
+        .getRuntimeMeta()
+        .getConsensusGroupIdToTaskMetaMap()
+        .forEach(
+            ((consensusGroupId, pipeTaskMeta) -> {
+              startPipeTaskByConsensusGroup(pipeName, creationTime, consensusGroupId);
+            }));
+    // set pipe meta status to RUNNING
+    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
+  }
 
   public void startPipeTaskByConsensusGroup(
       String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}
 
-  public void stopPipe(String pipeName, long creationTime) {}
+  public void stopPipe(String pipeName, long creationTime) {
+    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+    if (existedPipeMeta == null) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has already been dropped or has not been created. Skip stopping.",
+          pipeName,
+          creationTime);
+      return;
+    }
+    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
+      throw new PipeManagementException(
+          String.format(
+              "Inconsistency between pipe meta and stopPipe request detected. "
+                  + "Pipe %s (creation time = %d) has been created but does not match the creation time (%d) in stopPipe request.",
+              pipeName, existedPipeMeta.getStaticMeta().getCreationTime(), creationTime));
+    }
+
+    switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+      case STOPPED:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has already been stopped. Current status = {}. Skip stopping.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        return;
+      case RUNNING:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has been started. Current status = {}. Stopping.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        break;
+      case DROPPED:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has already been dropped. Current status = {}. Skip stopping.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        return;
+      default:
+        throw new IllegalStateException(
+            "Unexpected status: " + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+    }
+
+    // stop pipe task by consensus group
+    existedPipeMeta
+        .getRuntimeMeta()
+        .getConsensusGroupIdToTaskMetaMap()
+        .forEach(
+            ((consensusGroupId, pipeTaskMeta) -> {
+              stopPipeTaskByConsensusGroup(pipeName, creationTime, consensusGroupId);
+            }));
+    // set pipe meta status to STOPPED
+    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
+  }
 
   public void stopPipeTaskByConsensusGroup(
       String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}