You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/06 19:59:27 UTC

[iotdb] 02/03: DN: dropPipe

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e45834c0708311402fbed3b5f2cc2f6113cb4bf7
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun May 7 03:37:54 2023 +0800

    DN: dropPipe
---
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 38 +++++++++++++++++++++-
 1 file changed, 37 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index b27dfe97f4e..d99d891eb5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.agent.task;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,6 +87,8 @@ public class PipeTaskAgent {
                   pipeName, creationTime, consensusGroupId, pipeTaskMeta);
             }));
     // add pipe meta to pipe meta keeper
+    // note that we do not need to set the status of pipe meta here, because the status of pipe meta
+    // is already set to STOPPED when it is created
     pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta);
   }
 
@@ -95,7 +98,40 @@ public class PipeTaskAgent {
       TConsensusGroupId consensusGroupId,
       PipeTaskMeta pipeTaskMeta) {}
 
-  public void dropPipe(String pipeName, long creationTime) {}
+  public void dropPipe(String pipeName, long creationTime) {
+    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+    if (existedPipeMeta == null) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has already been dropped or has not been created. Skip dropping.",
+          pipeName,
+          creationTime);
+      return;
+    }
+    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has been created but does not match the creation time ({}) in dropPipe request. Skip dropping.",
+          pipeName,
+          existedPipeMeta.getStaticMeta().getCreationTime(),
+          creationTime);
+      return;
+    }
+
+    // mark pipe meta as dropped first. this will help us detect if the pipe meta has been dropped
+    // but the pipe task meta has not been cleaned up (in case of failure when executing
+    // dropPipeTaskByConsensusGroup).
+    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
+    // drop pipe task by consensus group
+    existedPipeMeta
+        .getRuntimeMeta()
+        .getConsensusGroupIdToTaskMetaMap()
+        .forEach(
+            ((consensusGroupId, pipeTaskMeta) -> {
+              dropPipeTaskByConsensusGroup(pipeName, creationTime, consensusGroupId);
+            }));
+    // remove pipe meta from pipe meta keeper
+    pipeMetaKeeper.removePipeMeta(pipeName);
+  }
 
   public void dropPipeTaskByConsensusGroup(
       String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}