You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/06 19:59:27 UTC
[iotdb] 02/03: DN: dropPipe
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e45834c0708311402fbed3b5f2cc2f6113cb4bf7
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun May 7 03:37:54 2023 +0800
DN: dropPipe
---
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 38 +++++++++++++++++++++-
1 file changed, 37 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index b27dfe97f4e..d99d891eb5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.agent.task;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,6 +87,8 @@ public class PipeTaskAgent {
pipeName, creationTime, consensusGroupId, pipeTaskMeta);
}));
// add pipe meta to pipe meta keeper
+ // note that we do not need to set the status of pipe meta here, because the status of pipe meta
+ // is already set to STOPPED when it is created
pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta);
}
@@ -95,7 +98,40 @@ public class PipeTaskAgent {
TConsensusGroupId consensusGroupId,
PipeTaskMeta pipeTaskMeta) {}
- public void dropPipe(String pipeName, long creationTime) {}
+ public void dropPipe(String pipeName, long creationTime) {
+ final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+ if (existedPipeMeta == null) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped or has not been created. Skip dropping.",
+ pipeName,
+ creationTime);
+ return;
+ }
+ if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has been created but does not match the creation time ({}) in dropPipe request. Skip dropping.",
+ pipeName,
+ existedPipeMeta.getStaticMeta().getCreationTime(),
+ creationTime);
+ return;
+ }
+
+ // mark pipe meta as dropped first. this will help us detect if the pipe meta has been dropped
+ // but the pipe task meta has not been cleaned up (in case of failure when executing
+ // dropPipeTaskByConsensusGroup).
+ existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
+ // drop pipe task by consensus group
+ existedPipeMeta
+ .getRuntimeMeta()
+ .getConsensusGroupIdToTaskMetaMap()
+ .forEach(
+ ((consensusGroupId, pipeTaskMeta) -> {
+ dropPipeTaskByConsensusGroup(pipeName, creationTime, consensusGroupId);
+ }));
+ // remove pipe meta from pipe meta keeper
+ pipeMetaKeeper.removePipeMeta(pipeName);
+ }
public void dropPipeTaskByConsensusGroup(
String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}