You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/07 18:05:55 UTC
[iotdb] 03/04: fix start / stop process on cn procedure
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 08625c6483f95f6dc7ee2cd3f465e67fca60eac6
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 8 01:27:23 2023 +0800
fix start / stop process on cn procedure
---
.../confignode/persistence/pipe/PipeTaskInfo.java | 28 ++++++++++++++++------
.../pipe/task/AbstractOperatePipeProcedureV2.java | 7 ++++--
.../impl/pipe/task/CreatePipeProcedureV2.java | 6 ++---
3 files changed, 29 insertions(+), 12 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index c3975a817d7..fa8c1ae9cdb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -81,11 +81,17 @@ public class PipeTaskInfo implements SnapshotProcessor {
return false;
}
- if (getPipeStatus(pipeName) == PipeStatus.RUNNING) {
+ final PipeStatus pipeStatus = getPipeStatus(pipeName);
+ if (pipeStatus == PipeStatus.RUNNING) {
LOGGER.info(
String.format("Failed to start pipe [%s], the pipe is already running", pipeName));
return false;
}
+ if (pipeStatus == PipeStatus.DROPPED) {
+ LOGGER.info(
+ String.format("Failed to start pipe [%s], the pipe is already dropped", pipeName));
+ return false;
+ }
return true;
}
@@ -96,21 +102,29 @@ public class PipeTaskInfo implements SnapshotProcessor {
return false;
}
- if (getPipeStatus(pipeName) == PipeStatus.STOPPED) {
+ final PipeStatus pipeStatus = getPipeStatus(pipeName);
+ if (pipeStatus == PipeStatus.STOPPED) {
LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already stop", pipeName));
return false;
}
+ if (pipeStatus == PipeStatus.DROPPED) {
+ LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already dropped", pipeName));
+ return false;
+ }
return true;
}
public boolean checkBeforeDropPipe(String pipeName) {
- if (isPipeExisted(pipeName)) {
- return true;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Check before drop pipe {}, pipe exists: {}.",
+ pipeName,
+ isPipeExisted(pipeName) ? "true" : "false");
}
-
- LOGGER.info(String.format("Failed to drop pipe [%s], the pipe does not exist", pipeName));
- return false;
+ // no matter whether the pipe exists, we allow the drop operation executed on all nodes to
+ // ensure the consistency
+ return true;
}
private boolean isPipeExisted(String pipeName) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
index 0d9f98101f8..7b2f5cac3ac 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
@@ -129,8 +129,11 @@ abstract class AbstractOperatePipeProcedureV2 extends AbstractNodeProcedure<Oper
throws IOException, InterruptedException, ProcedureException {
switch (state) {
case VALIDATE_TASK:
- rollbackFromValidateTask(env);
- env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+ try {
+ rollbackFromValidateTask(env);
+ } finally {
+ env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+ }
break;
case CALCULATE_INFO_FOR_TASK:
rollbackFromCalculateInfoForTask(env);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 995b2a977dd..d81a88fe534 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -103,9 +103,9 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
.getLoadManager()
.getRegionLeaderMap()
.forEach(
- (region, leader) -> {
- consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0, leader));
- });
+ (region, leader) ->
+ // TODO: make index configurable
+ consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0, leader)));
pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
}