You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/07 18:05:55 UTC

[iotdb] 03/04: fix start / stop process on cn procedure

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 08625c6483f95f6dc7ee2cd3f465e67fca60eac6
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 8 01:27:23 2023 +0800

    fix start / stop process on cn procedure
---
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 28 ++++++++++++++++------
 .../pipe/task/AbstractOperatePipeProcedureV2.java  |  7 ++++--
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  6 ++---
 3 files changed, 29 insertions(+), 12 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index c3975a817d7..fa8c1ae9cdb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -81,11 +81,17 @@ public class PipeTaskInfo implements SnapshotProcessor {
       return false;
     }
 
-    if (getPipeStatus(pipeName) == PipeStatus.RUNNING) {
+    final PipeStatus pipeStatus = getPipeStatus(pipeName);
+    if (pipeStatus == PipeStatus.RUNNING) {
       LOGGER.info(
           String.format("Failed to start pipe [%s], the pipe is already running", pipeName));
       return false;
     }
+    if (pipeStatus == PipeStatus.DROPPED) {
+      LOGGER.info(
+          String.format("Failed to start pipe [%s], the pipe is already dropped", pipeName));
+      return false;
+    }
 
     return true;
   }
@@ -96,21 +102,29 @@ public class PipeTaskInfo implements SnapshotProcessor {
       return false;
     }
 
-    if (getPipeStatus(pipeName) == PipeStatus.STOPPED) {
+    final PipeStatus pipeStatus = getPipeStatus(pipeName);
+    if (pipeStatus == PipeStatus.STOPPED) {
       LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already stop", pipeName));
       return false;
     }
+    if (pipeStatus == PipeStatus.DROPPED) {
+      LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already dropped", pipeName));
+      return false;
+    }
 
     return true;
   }
 
   public boolean checkBeforeDropPipe(String pipeName) {
-    if (isPipeExisted(pipeName)) {
-      return true;
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Check before drop pipe {}, pipe exists: {}.",
+          pipeName,
+          isPipeExisted(pipeName) ? "true" : "false");
     }
-
-    LOGGER.info(String.format("Failed to drop pipe [%s], the pipe does not exist", pipeName));
-    return false;
+    // no matter whether the pipe exists, we allow the drop operation executed on all nodes to
+    // ensure the consistency
+    return true;
   }
 
   private boolean isPipeExisted(String pipeName) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
index 0d9f98101f8..7b2f5cac3ac 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
@@ -129,8 +129,11 @@ abstract class AbstractOperatePipeProcedureV2 extends AbstractNodeProcedure<Oper
       throws IOException, InterruptedException, ProcedureException {
     switch (state) {
       case VALIDATE_TASK:
-        rollbackFromValidateTask(env);
-        env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+        try {
+          rollbackFromValidateTask(env);
+        } finally {
+          env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+        }
         break;
       case CALCULATE_INFO_FOR_TASK:
         rollbackFromCalculateInfoForTask(env);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 995b2a977dd..d81a88fe534 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -103,9 +103,9 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
         .getLoadManager()
         .getRegionLeaderMap()
         .forEach(
-            (region, leader) -> {
-              consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0, leader));
-            });
+            (region, leader) ->
+                // TODO: make index configurable
+                consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0, leader)));
     pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
   }