You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/06 19:59:28 UTC

[iotdb] 03/03: DN: startPipe & stopPipe

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a9ca134f3b646f3623d19cb8f27dee171cbaf279
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun May 7 03:50:20 2023 +0800

    DN: startPipe & stopPipe
---
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 116 ++++++++++++++++++++-
 1 file changed, 113 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index d99d891eb5f..fd401a325ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +44,6 @@ public class PipeTaskAgent {
     final String pipeName = pipeMeta.getStaticMeta().getPipeName();
     final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
 
-    // check if the pipe has already been created before
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
     if (existedPipeMeta != null) {
       if (existedPipeMeta.getStaticMeta().getCreationTime() == creationTime) {
@@ -136,12 +136,122 @@ public class PipeTaskAgent {
   public void dropPipeTaskByConsensusGroup(
       String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}
 
-  public void startPipe(String pipeName, long creationTime) {}
+  public void startPipe(String pipeName, long creationTime) {
+    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+    if (existedPipeMeta == null) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has already been dropped or has not been created. Skip starting.",
+          pipeName,
+          creationTime);
+      return;
+    }
+    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
+      throw new PipeManagementException(
+          String.format(
+              "Inconsistency between pipe meta and startPipe request detected. "
+                  + "Pipe %s (creation time = %d) has been created but does not match the creation time (%d) in startPipe request.",
+              pipeName, existedPipeMeta.getStaticMeta().getCreationTime(), creationTime));
+    }
+
+    switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+      case STOPPED:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has been created. Current status = {}. Starting.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        break;
+      case RUNNING:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has already been started. Current status = {}. Skip starting.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        return;
+      case DROPPED:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has already been dropped. Current status = {}. Skip starting.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        return;
+      default:
+        throw new IllegalStateException(
+            "Unexpected status: " + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+    }
+
+    // start pipe task by consensus group
+    existedPipeMeta
+        .getRuntimeMeta()
+        .getConsensusGroupIdToTaskMetaMap()
+        .forEach(
+            ((consensusGroupId, pipeTaskMeta) -> {
+              startPipeTaskByConsensusGroup(pipeName, creationTime, consensusGroupId);
+            }));
+    // set pipe meta status to RUNNING
+    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
+  }
 
   public void startPipeTaskByConsensusGroup(
       String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}
 
-  public void stopPipe(String pipeName, long creationTime) {}
+  public void stopPipe(String pipeName, long creationTime) {
+    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+    if (existedPipeMeta == null) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has already been dropped or has not been created. Skip stopping.",
+          pipeName,
+          creationTime);
+      return;
+    }
+    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
+      throw new PipeManagementException(
+          String.format(
+              "Inconsistency between pipe meta and stopPipe request detected. "
+                  + "Pipe %s (creation time = %d) has been created but does not match the creation time (%d) in stopPipe request.",
+              pipeName, existedPipeMeta.getStaticMeta().getCreationTime(), creationTime));
+    }
+
+    switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+      case STOPPED:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has already been stopped. Current status = {}. Skip stopping.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        return;
+      case RUNNING:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has been started. Current status = {}. Stopping.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        break;
+      case DROPPED:
+        LOGGER.info(
+            "Pipe {} (creation time = {}) has already been dropped. Current status = {}. Skip stopping.",
+            pipeName,
+            creationTime,
+            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        return;
+      default:
+        throw new IllegalStateException(
+            "Unexpected status: " + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+    }
+
+    // stop pipe task by consensus group
+    existedPipeMeta
+        .getRuntimeMeta()
+        .getConsensusGroupIdToTaskMetaMap()
+        .forEach(
+            ((consensusGroupId, pipeTaskMeta) -> {
+              stopPipeTaskByConsensusGroup(pipeName, creationTime, consensusGroupId);
+            }));
+    // set pipe meta status to STOPPED
+    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
+  }
 
   public void stopPipeTaskByConsensusGroup(
       String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}