You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/06 19:59:28 UTC
[iotdb] 03/03: DN: startPipe & stopPipe
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a9ca134f3b646f3623d19cb8f27dee171cbaf279
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun May 7 03:50:20 2023 +0800
DN: startPipe & stopPipe
---
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 116 ++++++++++++++++++++-
1 file changed, 113 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index d99d891eb5f..fd401a325ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +44,6 @@ public class PipeTaskAgent {
final String pipeName = pipeMeta.getStaticMeta().getPipeName();
final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
- // check if the pipe has already been created before
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (existedPipeMeta != null) {
if (existedPipeMeta.getStaticMeta().getCreationTime() == creationTime) {
@@ -136,12 +136,122 @@ public class PipeTaskAgent {
public void dropPipeTaskByConsensusGroup(
String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}
- public void startPipe(String pipeName, long creationTime) {}
+ public void startPipe(String pipeName, long creationTime) {
+ final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+ if (existedPipeMeta == null) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped or has not been created. Skip starting.",
+ pipeName,
+ creationTime);
+ return;
+ }
+ if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
+ throw new PipeManagementException(
+ String.format(
+ "Inconsistency between pipe meta and startPipe request detected. "
+ + "Pipe %s (creation time = %d) has been created but does not match the creation time (%d) in startPipe request.",
+ pipeName, existedPipeMeta.getStaticMeta().getCreationTime(), creationTime));
+ }
+
+ switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+ case STOPPED:
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has been created. Current status = {}. Starting.",
+ pipeName,
+ creationTime,
+ existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ break;
+ case RUNNING:
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been started. Current status = {}. Skip starting.",
+ pipeName,
+ creationTime,
+ existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ return;
+ case DROPPED:
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped. Current status = {}. Skip starting.",
+ pipeName,
+ creationTime,
+ existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ return;
+ default:
+ throw new IllegalStateException(
+ "Unexpected status: " + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ }
+
+ // start pipe task by consensus group
+ existedPipeMeta
+ .getRuntimeMeta()
+ .getConsensusGroupIdToTaskMetaMap()
+ .forEach(
+ ((consensusGroupId, pipeTaskMeta) -> {
+ startPipeTaskByConsensusGroup(pipeName, creationTime, consensusGroupId);
+ }));
+ // set pipe meta status to RUNNING
+ existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
+ }
public void startPipeTaskByConsensusGroup(
String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}
- public void stopPipe(String pipeName, long creationTime) {}
+ public void stopPipe(String pipeName, long creationTime) {
+ final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+ if (existedPipeMeta == null) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped or has not been created. Skip stopping.",
+ pipeName,
+ creationTime);
+ return;
+ }
+ if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
+ throw new PipeManagementException(
+ String.format(
+ "Inconsistency between pipe meta and stopPipe request detected. "
+ + "Pipe %s (creation time = %d) has been created but does not match the creation time (%d) in stopPipe request.",
+ pipeName, existedPipeMeta.getStaticMeta().getCreationTime(), creationTime));
+ }
+
+ switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+ case STOPPED:
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been stopped. Current status = {}. Skip stopping.",
+ pipeName,
+ creationTime,
+ existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ return;
+ case RUNNING:
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has been started. Current status = {}. Stopping.",
+ pipeName,
+ creationTime,
+ existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ break;
+ case DROPPED:
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped. Current status = {}. Skip stopping.",
+ pipeName,
+ creationTime,
+ existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ return;
+ default:
+ throw new IllegalStateException(
+ "Unexpected status: " + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ }
+
+ // stop pipe task by consensus group
+ existedPipeMeta
+ .getRuntimeMeta()
+ .getConsensusGroupIdToTaskMetaMap()
+ .forEach(
+ ((consensusGroupId, pipeTaskMeta) -> {
+ stopPipeTaskByConsensusGroup(pipeName, creationTime, consensusGroupId);
+ }));
+ // set pipe meta status to STOPPED
+ existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
+ }
public void stopPipeTaskByConsensusGroup(
String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}