You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/07 18:05:52 UTC

[iotdb] branch IOTDB-5787 updated (a9ca134f3b6 -> 4987a74800e)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from a9ca134f3b6 DN: startPipe & stopPipe
     add 478e4d17952 [IOTDB-5839] Pipe task management (CN -> DN): squash all operation rpcs into one (#9750)
     new f39f6744773 merge master
     new 4d2f0aeddcf fix start / stop process
     new 08625c6483f fix start / stop process on cn procedure
     new 4987a74800e state transition diagram of a pipe task

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../confignode/client/DataNodeRequestType.java     |  7 +--
 .../client/async/AsyncDataNodeClientPool.java      | 15 ++---
 .../client/async/handlers/AsyncClientHandler.java  |  1 +
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 32 +++++++---
 .../procedure/env/ConfigNodeProcedureEnv.java      | 23 ++-----
 .../pipe/task/AbstractOperatePipeProcedureV2.java  | 50 +++++++++++++--
 .../impl/pipe/task/CreatePipeProcedureV2.java      | 34 ++---------
 .../impl/pipe/task/DropPipeProcedureV2.java        | 16 +----
 .../impl/pipe/task/StartPipeProcedureV2.java       | 28 ++-------
 .../impl/pipe/task/StopPipeProcedureV2.java        | 28 ++-------
 .../iotdb/confignode/persistence/PipeInfoTest.java |  1 +
 .../iotdb/commons/pipe/task/meta/PipeMeta.java     | 10 ++-
 .../commons/pipe/task/meta/PipeMetaKeeper.java     |  4 ++
 .../commons/pipe/task/meta/PipeRuntimeMeta.java    | 20 +-----
 .../commons/pipe/task/meta/PipeStaticMeta.java     | 65 ++++++++++----------
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 51 +++++++++++++++-
 .../iotdb/pipe/api/customizer/PipeParameters.java  | 22 +++++++
 ...tion.java => PipeRuntimeCriticalException.java} | 18 ++++--
 ...entException.java => PipeRuntimeException.java} | 18 ++++--
 ...n.java => PipeRuntimeNonCriticalException.java} | 18 ++++--
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 71 +++++++++++++++++-----
 .../impl/DataNodeInternalRPCServiceImpl.java       | 31 ++--------
 thrift/src/main/thrift/datanode.thrift             | 19 ++----
 23 files changed, 324 insertions(+), 258 deletions(-)
 copy pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/{PipeManagementException.java => PipeRuntimeCriticalException.java} (67%)
 copy pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/{PipeManagementException.java => PipeRuntimeException.java} (68%)
 copy pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/{PipeManagementException.java => PipeRuntimeNonCriticalException.java} (66%)


[iotdb] 03/04: fix start / stop process on cn procedure

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 08625c6483f95f6dc7ee2cd3f465e67fca60eac6
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 8 01:27:23 2023 +0800

    fix start / stop process on cn procedure
---
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 28 ++++++++++++++++------
 .../pipe/task/AbstractOperatePipeProcedureV2.java  |  7 ++++--
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  6 ++---
 3 files changed, 29 insertions(+), 12 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index c3975a817d7..fa8c1ae9cdb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -81,11 +81,17 @@ public class PipeTaskInfo implements SnapshotProcessor {
       return false;
     }
 
-    if (getPipeStatus(pipeName) == PipeStatus.RUNNING) {
+    final PipeStatus pipeStatus = getPipeStatus(pipeName);
+    if (pipeStatus == PipeStatus.RUNNING) {
       LOGGER.info(
           String.format("Failed to start pipe [%s], the pipe is already running", pipeName));
       return false;
     }
+    if (pipeStatus == PipeStatus.DROPPED) {
+      LOGGER.info(
+          String.format("Failed to start pipe [%s], the pipe is already dropped", pipeName));
+      return false;
+    }
 
     return true;
   }
@@ -96,21 +102,29 @@ public class PipeTaskInfo implements SnapshotProcessor {
       return false;
     }
 
-    if (getPipeStatus(pipeName) == PipeStatus.STOPPED) {
+    final PipeStatus pipeStatus = getPipeStatus(pipeName);
+    if (pipeStatus == PipeStatus.STOPPED) {
       LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already stop", pipeName));
       return false;
     }
+    if (pipeStatus == PipeStatus.DROPPED) {
+      LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already dropped", pipeName));
+      return false;
+    }
 
     return true;
   }
 
   public boolean checkBeforeDropPipe(String pipeName) {
-    if (isPipeExisted(pipeName)) {
-      return true;
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Check before drop pipe {}, pipe exists: {}.",
+          pipeName,
+          isPipeExisted(pipeName) ? "true" : "false");
     }
-
-    LOGGER.info(String.format("Failed to drop pipe [%s], the pipe does not exist", pipeName));
-    return false;
+    // no matter whether the pipe exists, we allow the drop operation executed on all nodes to
+    // ensure the consistency
+    return true;
   }
 
   private boolean isPipeExisted(String pipeName) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
index 0d9f98101f8..7b2f5cac3ac 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
@@ -129,8 +129,11 @@ abstract class AbstractOperatePipeProcedureV2 extends AbstractNodeProcedure<Oper
       throws IOException, InterruptedException, ProcedureException {
     switch (state) {
       case VALIDATE_TASK:
-        rollbackFromValidateTask(env);
-        env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+        try {
+          rollbackFromValidateTask(env);
+        } finally {
+          env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+        }
         break;
       case CALCULATE_INFO_FOR_TASK:
         rollbackFromCalculateInfoForTask(env);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 995b2a977dd..d81a88fe534 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -103,9 +103,9 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
         .getLoadManager()
         .getRegionLeaderMap()
         .forEach(
-            (region, leader) -> {
-              consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0, leader));
-            });
+            (region, leader) ->
+                // TODO: make index configurable
+                consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0, leader)));
     pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
   }
 


[iotdb] 01/04: merge master

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f39f67447738f09099bc14bc17b36303153493d1
Merge: a9ca134f3b6 478e4d17952
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun May 7 21:29:09 2023 +0800

    merge master

 .../confignode/client/DataNodeRequestType.java     |  7 +--
 .../client/async/AsyncDataNodeClientPool.java      | 15 ++---
 .../client/async/handlers/AsyncClientHandler.java  |  1 +
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  4 ++
 .../procedure/env/ConfigNodeProcedureEnv.java      | 23 ++------
 .../pipe/task/AbstractOperatePipeProcedureV2.java  | 43 +++++++++++++-
 .../impl/pipe/task/CreatePipeProcedureV2.java      | 28 +---------
 .../impl/pipe/task/DropPipeProcedureV2.java        | 16 +-----
 .../impl/pipe/task/StartPipeProcedureV2.java       | 28 ++--------
 .../impl/pipe/task/StopPipeProcedureV2.java        | 28 ++--------
 .../iotdb/confignode/persistence/PipeInfoTest.java |  1 +
 .../iotdb/commons/pipe/task/meta/PipeMeta.java     | 10 +++-
 .../commons/pipe/task/meta/PipeMetaKeeper.java     |  4 ++
 .../commons/pipe/task/meta/PipeRuntimeMeta.java    | 20 +------
 .../commons/pipe/task/meta/PipeStaticMeta.java     | 65 +++++++++++-----------
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 51 ++++++++++++++++-
 .../iotdb/pipe/api/customizer/PipeParameters.java  | 22 ++++++++
 .../exception/PipeRuntimeCriticalException.java    | 40 +++++++++++++
 .../pipe/api/exception/PipeRuntimeException.java   | 40 +++++++++++++
 .../exception/PipeRuntimeNonCriticalException.java | 40 +++++++++++++
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |  1 +
 .../impl/DataNodeInternalRPCServiceImpl.java       | 31 ++---------
 thrift/src/main/thrift/datanode.thrift             | 19 ++-----
 23 files changed, 320 insertions(+), 217 deletions(-)

diff --cc node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index d04549c4c37,14185dd8a5e..eab74083860
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@@ -32,12 -32,8 +32,8 @@@ import java.util.Map
  public class PipeStaticMeta {
  
    private String pipeName;
 -  private long createTime;
 +  private long creationTime;
  
-   private Map<String, String> collectorAttributes = new HashMap<>();
-   private Map<String, String> processorAttributes = new HashMap<>();
-   private Map<String, String> connectorAttributes = new HashMap<>();
- 
    private PipeParameters collectorParameters;
    private PipeParameters processorParameters;
    private PipeParameters connectorParameters;
@@@ -51,10 -47,7 +47,7 @@@
        Map<String, String> processorAttributes,
        Map<String, String> connectorAttributes) {
      this.pipeName = pipeName.toUpperCase();
 -    this.createTime = createTime;
 +    this.creationTime = creationTime;
-     this.collectorAttributes = collectorAttributes;
-     this.processorAttributes = processorAttributes;
-     this.connectorAttributes = connectorAttributes;
      collectorParameters = new PipeParameters(collectorAttributes);
      processorParameters = new PipeParameters(processorAttributes);
      connectorParameters = new PipeParameters(connectorAttributes);
@@@ -89,10 -82,10 +82,10 @@@
  
    public void serialize(DataOutputStream outputStream) throws IOException {
      ReadWriteIOUtils.write(pipeName, outputStream);
 -    ReadWriteIOUtils.write(createTime, outputStream);
 +    ReadWriteIOUtils.write(creationTime, outputStream);
  
-     outputStream.writeInt(collectorAttributes.size());
-     for (Map.Entry<String, String> entry : collectorAttributes.entrySet()) {
+     outputStream.writeInt(collectorParameters.getAttribute().size());
+     for (Map.Entry<String, String> entry : collectorParameters.getAttribute().entrySet()) {
        ReadWriteIOUtils.write(entry.getKey(), outputStream);
        ReadWriteIOUtils.write(entry.getValue(), outputStream);
      }
@@@ -117,12 -110,18 +110,18 @@@
      final PipeStaticMeta pipeStaticMeta = new PipeStaticMeta();
  
      pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(byteBuffer);
 -    pipeStaticMeta.createTime = ReadWriteIOUtils.readLong(byteBuffer);
 +    pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(byteBuffer);
  
+     pipeStaticMeta.collectorParameters = new PipeParameters(new HashMap<>());
+     pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>());
+     pipeStaticMeta.connectorParameters = new PipeParameters(new HashMap<>());
+ 
      int size = byteBuffer.getInt();
      for (int i = 0; i < size; ++i) {
-       pipeStaticMeta.collectorAttributes.put(
-           ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+       pipeStaticMeta
+           .collectorParameters
+           .getAttribute()
+           .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
      }
      size = byteBuffer.getInt();
      for (int i = 0; i < size; ++i) {
@@@ -152,10 -151,10 +151,10 @@@
      }
      PipeStaticMeta that = (PipeStaticMeta) obj;
      return pipeName.equals(that.pipeName)
 -        && createTime == that.createTime
 +        && creationTime == that.creationTime
-         && collectorAttributes.equals(that.collectorAttributes)
-         && processorAttributes.equals(that.processorAttributes)
-         && connectorAttributes.equals(that.connectorAttributes);
+         && collectorParameters.equals(that.collectorParameters)
+         && processorParameters.equals(that.processorParameters)
+         && connectorParameters.equals(that.connectorParameters);
    }
  
    @Override
@@@ -169,14 -168,14 +168,14 @@@
          + "pipeName='"
          + pipeName
          + '\''
--        + ", createTime="
 -        + createTime
++        + ", creationTime="
 +        + creationTime
-         + ", collectorAttributes="
-         + collectorAttributes
-         + ", processorAttributes="
-         + processorAttributes
-         + ", connectorAttributes="
-         + connectorAttributes
+         + ", collectorParameters="
+         + collectorParameters.getAttribute()
+         + ", processorParameters="
+         + processorParameters.getAttribute()
+         + ", connectorParameters="
+         + connectorParameters.getAttribute()
          + '}';
    }
  }
diff --cc server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index fd401a325ef,9fffa867062..c9be5f146a1
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@@ -19,14 -19,8 +19,15 @@@
  
  package org.apache.iotdb.db.pipe.agent.task;
  
 +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
  import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
  import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
 +import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 +import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 +import org.apache.iotdb.pipe.api.exception.PipeManagementException;
++
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  
  public class PipeTaskAgent {
  


[iotdb] 04/04: state transition diagram of a pipe task

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4987a74800ef45d2a19451e7e235a90d87d2baed
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 8 02:02:45 2023 +0800

    state transition diagram of a pipe task
---
 .../apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index e896cd765d7..c0df1e1761d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -28,6 +28,19 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * State transition diagram of a pipe task:
+ *
+ * <p><code>
+ * |----------------|                     |---------| --> start pipe --> |---------|                   |---------|
+ * | initial status | --> create pipe --> | STOPPED |                    | RUNNING | --> drop pipe --> | DROPPED |
+ * |----------------|                     |---------| <-- stop  pipe <-- |---------|                   |---------|
+ *                                             |                                                            |
+ *                                             | ----------------------> drop pipe -----------------------> |
+ * </code>
+ *
+ * <p>Other transitions are not allowed, will be ignored when received in the pipe task agent.
+ */
 public class PipeTaskAgent {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskAgent.class);
@@ -90,7 +103,7 @@ public class PipeTaskAgent {
     // add pipe meta to pipe meta keeper
     // note that we do not need to set the status of pipe meta here, because the status of pipe meta
     // is already set to STOPPED when it is created
-    pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta);
+    pipeMetaKeeper.addPipeMeta(pipeName, pipeMeta);
   }
 
   public void createPipeTaskByConsensusGroup(


[iotdb] 02/04: fix start / stop process

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4d2f0aeddcf60e2a92bc70d6152ecf31e5595b60
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 8 00:58:58 2023 +0800

    fix start / stop process
---
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 55 +++++++++++++++++-----
 1 file changed, 42 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index c9be5f146a1..e896cd765d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,11 +58,12 @@ public class PipeTaskAgent {
             return;
           case DROPPED:
             LOGGER.info(
-                "Pipe {} (creation time = {}) has already been dropped. Current status = {}. Recreating.",
+                "Pipe {} (creation time = {}) has already been dropped, but the pipe task meta has not been cleaned up. "
+                    + "Current status = {}. Try dropping the pipe and recreating it.",
                 pipeName,
                 creationTime,
                 existedPipeMeta.getRuntimeMeta().getStatus().get().name());
-            // break to drop the pipe meta and recreate it
+            // break to drop the pipe and recreate it
             break;
           default:
             throw new IllegalStateException(
@@ -137,6 +137,33 @@ public class PipeTaskAgent {
   public void dropPipeTaskByConsensusGroup(
       String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}
 
+  public void dropPipe(String pipeName) {
+    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+    if (existedPipeMeta == null) {
+      LOGGER.info(
+          "Pipe {} has already been dropped or has not been created. Skip dropping.", pipeName);
+      return;
+    }
+
+    // mark pipe meta as dropped first. this will help us detect if the pipe meta has been dropped
+    // but the pipe task meta has not been cleaned up (in case of failure when executing
+    // dropPipeTaskByConsensusGroup).
+    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
+    // drop pipe task by consensus group
+    existedPipeMeta
+        .getRuntimeMeta()
+        .getConsensusGroupIdToTaskMetaMap()
+        .forEach(
+            ((consensusGroupId, pipeTaskMeta) -> {
+              dropPipeTaskByConsensusGroup(pipeName, consensusGroupId);
+            }));
+    // remove pipe meta from pipe meta keeper
+    pipeMetaKeeper.removePipeMeta(pipeName);
+  }
+
+  public void dropPipeTaskByConsensusGroup(String pipeName, TConsensusGroupId consensusGroupId) {}
+
   public void startPipe(String pipeName, long creationTime) {
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
 
@@ -148,11 +175,12 @@ public class PipeTaskAgent {
       return;
     }
     if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
-      throw new PipeManagementException(
-          String.format(
-              "Inconsistency between pipe meta and startPipe request detected. "
-                  + "Pipe %s (creation time = %d) has been created but does not match the creation time (%d) in startPipe request.",
-              pipeName, existedPipeMeta.getStaticMeta().getCreationTime(), creationTime));
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has been created but does not match the creation time ({}) in startPipe request. Skip starting.",
+          pipeName,
+          existedPipeMeta.getStaticMeta().getCreationTime(),
+          creationTime);
+      return;
     }
 
     switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
@@ -208,11 +236,12 @@ public class PipeTaskAgent {
       return;
     }
     if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
-      throw new PipeManagementException(
-          String.format(
-              "Inconsistency between pipe meta and stopPipe request detected. "
-                  + "Pipe %s (creation time = %d) has been created but does not match the creation time (%d) in stopPipe request.",
-              pipeName, existedPipeMeta.getStaticMeta().getCreationTime(), creationTime));
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has been created but does not match the creation time ({}) in stopPipe request. Skip stopping.",
+          pipeName,
+          existedPipeMeta.getStaticMeta().getCreationTime(),
+          creationTime);
+      return;
     }
 
     switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {