You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/23 18:47:44 UTC

[iotdb] branch master updated: [IOTDB-5912] Pipe: Handle PipeMeta changes through heartbeat (#9922)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e46be0e9623 [IOTDB-5912] Pipe: Handle PipeMeta changes through heartbeat (#9922)
e46be0e9623 is described below

commit e46be0e9623791cf590230aa0fe965ecf3e0c103
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Wed May 24 02:47:35 2023 +0800

    [IOTDB-5912] Pipe: Handle PipeMeta changes through heartbeat (#9922)
    
    Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
 .../heartbeat/DataNodeHeartbeatHandler.java        |  10 +-
 .../consensus/request/ConfigPhysicalPlan.java      |   6 +-
 .../consensus/request/ConfigPhysicalPlanType.java  |   1 +
 .../PipeHandleLeaderChangePlan.java                |   2 +-
 .../pipe/runtime/PipeHandleMetaChangePlan.java     |  67 +++++
 .../iotdb/confignode/manager/ProcedureManager.java |  22 ++
 .../manager/load/service/HeartbeatService.java     |  16 +-
 .../pipe/runtime/PipeRuntimeCoordinator.java       |  21 ++
 .../persistence/executor/ConfigPlanExecutor.java   |   7 +-
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  13 +-
 .../persistence/pipe/PipeTaskOperation.java        |   2 +-
 .../runtime/PipeHandleLeaderChangeProcedure.java   |   2 +-
 .../runtime/PipeHandleMetaChangeProcedure.java     | 285 +++++++++++++++++++++
 .../procedure/store/ProcedureFactory.java          |   6 +
 .../confignode/procedure/store/ProcedureType.java  |   3 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |  44 +++-
 .../iotdb/confignode/persistence/PipeInfoTest.java |   1 -
 .../runtime/PipeHandleMetaChangeProcedureTest.java |  96 +++++++
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java |  13 +-
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |  31 +++
 .../impl/DataNodeInternalRPCServiceImpl.java       |   4 +
 thrift/src/main/thrift/datanode.thrift             |   2 +
 22 files changed, 629 insertions(+), 25 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index a7348e5d6af..8cfcda8c87e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
 import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.pipe.runtime.PipeRuntimeCoordinator;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -42,13 +43,16 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
 
   private final Consumer<Map<Integer, Long>> schemaQuotaRespProcess;
 
+  private final PipeRuntimeCoordinator pipeRuntimeCoordinator;
+
   public DataNodeHeartbeatHandler(
       int nodeId,
       LoadCache loadCache,
       Map<Integer, Long> deviceNum,
       Map<Integer, Long> timeSeriesNum,
       Map<Integer, Long> regionDisk,
-      Consumer<Map<Integer, Long>> schemaQuotaRespProcess) {
+      Consumer<Map<Integer, Long>> schemaQuotaRespProcess,
+      PipeRuntimeCoordinator pipeRuntimeCoordinator) {
 
     this.nodeId = nodeId;
     this.loadCache = loadCache;
@@ -56,6 +60,7 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
     this.timeSeriesNum = timeSeriesNum;
     this.regionDisk = regionDisk;
     this.schemaQuotaRespProcess = schemaQuotaRespProcess;
+    this.pipeRuntimeCoordinator = pipeRuntimeCoordinator;
   }
 
   @Override
@@ -106,6 +111,9 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
           break;
       }
     }
+    if (heartbeatResp.getPipeMetaList() != null) {
+      pipeRuntimeCoordinator.parseHeartbeat(nodeId, heartbeatResp.getPipeMetaList());
+    }
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 2991f05ea65..ff915a93e97 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -77,9 +77,10 @@ import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelStat
 import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
-import org.apache.iotdb.confignode.consensus.request.write.pipe.coordinator.PipeHandleLeaderChangePlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
@@ -396,6 +397,9 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
         case PipeHandleLeaderChange:
           plan = new PipeHandleLeaderChangePlan();
           break;
+        case PipeHandleMetaChange:
+          plan = new PipeHandleMetaChangePlan();
+          break;
         case GetRegionId:
           plan = new GetRegionIdPlan();
           break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index d36a9973cd5..b4ab2ad4953 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -182,6 +182,7 @@ public enum ConfigPhysicalPlanType {
 
   /** Pipe Runtime */
   PipeHandleLeaderChange((short) 1600),
+  PipeHandleMetaChange((short) 1601),
   ;
 
   private final short planType;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/coordinator/PipeHandleLeaderChangePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleLeaderChangePlan.java
similarity index 99%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/coordinator/PipeHandleLeaderChangePlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleLeaderChangePlan.java
index 8686e6e75f8..207cbe34b04 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/coordinator/PipeHandleLeaderChangePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleLeaderChangePlan.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.consensus.request.write.pipe.coordinator;
+package org.apache.iotdb.confignode.consensus.request.write.pipe.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java
new file mode 100644
index 00000000000..983574618ac
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.pipe.runtime;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class PipeHandleMetaChangePlan extends ConfigPhysicalPlan {
+
+  private List<PipeMeta> pipeMetaList = new ArrayList<>();
+
+  public PipeHandleMetaChangePlan() {
+    super(ConfigPhysicalPlanType.PipeHandleMetaChange);
+  }
+
+  public PipeHandleMetaChangePlan(List<PipeMeta> pipeMetaList) {
+    super(ConfigPhysicalPlanType.PipeHandleMetaChange);
+    this.pipeMetaList = pipeMetaList;
+  }
+
+  public List<PipeMeta> getPipeMetaList() {
+    return pipeMetaList;
+  }
+
+  @Override
+  protected void serializeImpl(DataOutputStream stream) throws IOException {
+    stream.writeShort(getType().getPlanType());
+
+    stream.writeInt(pipeMetaList.size());
+    for (PipeMeta pipeMeta : pipeMetaList) {
+      pipeMeta.serialize(stream);
+    }
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+    int size = buffer.getInt();
+    for (int i = 0; i < size; i++) {
+      PipeMeta pipeMeta = PipeMeta.deserialize(buffer);
+      pipeMetaList.add(pipeMeta);
+    }
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 4d67bd09a0e..a33072f8b19 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -53,6 +53,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
@@ -87,6 +88,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -772,6 +774,26 @@ public class ProcedureManager {
     }
   }
 
+  public TSStatus pipeHandleMetaChange(
+      int dataNodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromDataNode) {
+    try {
+      long procedureId =
+          executor.submitProcedure(
+              new PipeHandleMetaChangeProcedure(dataNodeId, pipeMetaByteBufferListFromDataNode));
+      List<TSStatus> statusList = new ArrayList<>();
+      boolean isSucceed =
+          waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+      if (isSucceed) {
+        return RpcUtils.SUCCESS_STATUS;
+      } else {
+        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+            .setMessage(statusList.get(0).getMessage());
+      }
+    } catch (Exception e) {
+      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+    }
+  }
+
   /**
    * Waiting until the specific procedures finished
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index 6e2eb055428..38a6d8dc5f8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -43,7 +43,7 @@ import java.util.Optional;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 /** Maintain the Cluster-Heartbeat-Service. */
 public class HeartbeatService {
@@ -63,7 +63,7 @@ public class HeartbeatService {
   private Future<?> currentHeartbeatFuture;
   private final ScheduledExecutorService heartBeatExecutor =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service");
-  private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
+  private final AtomicLong heartbeatCounter = new AtomicLong(0);
 
   public HeartbeatService(IManager configManager, LoadCache loadCache) {
     this.configManager = configManager;
@@ -124,14 +124,17 @@ public class HeartbeatService {
     // We sample DataNode's load in every 10 heartbeat loop
     heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
     heartbeatReq.setSchemaQuotaCount(configManager.getClusterSchemaManager().getSchemaQuotaCount());
-
-    /* Update heartbeat counter */
-    heartbeatCounter.getAndUpdate(x -> (x + 1) % 10);
+    // We collect pipe meta in every 100 heartbeat loop, TODO: make this configurable
+    heartbeatReq.setNeedPipeMetaList(heartbeatCounter.get() % 100 == 0);
     if (!configManager.getClusterQuotaManager().hasSpaceQuotaLimit()) {
       heartbeatReq.setSchemaRegionIds(configManager.getClusterQuotaManager().getSchemaRegionIds());
       heartbeatReq.setDataRegionIds(configManager.getClusterQuotaManager().getDataRegionIds());
       heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage());
     }
+
+    /* Update heartbeat counter */
+    heartbeatCounter.getAndIncrement();
+
     return heartbeatReq;
   }
 
@@ -175,7 +178,8 @@ public class HeartbeatService {
               configManager.getClusterQuotaManager().getDeviceNum(),
               configManager.getClusterQuotaManager().getTimeSeriesNum(),
               configManager.getClusterQuotaManager().getRegionDisk(),
-              configManager.getClusterSchemaManager()::updateSchemaQuota);
+              configManager.getClusterSchemaManager()::updateSchemaQuota,
+              configManager.getPipeManager().getPipeRuntimeCoordinator());
       configManager.getClusterQuotaManager().updateSpaceQuotaUsage();
       AsyncDataNodeHeartbeatClientPool.getInstance()
           .getDataNodeHeartBeat(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 9a117a0fbd1..3455a6c1369 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -29,10 +29,13 @@ import org.apache.iotdb.confignode.manager.load.subscriber.StatisticsChangeEvent
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
@@ -95,4 +98,22 @@ public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
   public void stopPipeMetaSync() {
     pipeMetaSyncer.stop();
   }
+
+  /**
+   * parse heartbeat from data node.
+   *
+   * @param dataNodeId data node id
+   * @param pipeMetaByteBufferListFromDataNode pipe meta byte buffer list collected from data node
+   */
+  public void parseHeartbeat(
+      int dataNodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromDataNode) {
+    final TSStatus result =
+        configManager
+            .getProcedureManager()
+            .pipeHandleMetaChange(dataNodeId, pipeMetaByteBufferListFromDataNode);
+    if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      LOGGER.warn(
+          "PipeTaskCoordinator meets error in handling pipe meta change, status: ({})", result);
+    }
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 2f812b173b2..2cd14b69fc8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -75,9 +75,10 @@ import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelStat
 import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
-import org.apache.iotdb.confignode.consensus.request.write.pipe.coordinator.PipeHandleLeaderChangePlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
@@ -408,6 +409,10 @@ public class ConfigPlanExecutor {
         return pipeInfo
             .getPipeTaskInfo()
             .handleLeaderChange((PipeHandleLeaderChangePlan) physicalPlan);
+      case PipeHandleMetaChange:
+        return pipeInfo
+            .getPipeTaskInfo()
+            .handleMetaChanges((PipeHandleMetaChangePlan) physicalPlan);
       case ADD_CQ:
         return cqInfo.addCQ((AddCQPlan) physicalPlan);
       case DROP_CQ:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 5ac5b7fca21..76ec9007ffa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -25,7 +25,8 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
-import org.apache.iotdb.confignode.consensus.request.write.pipe.coordinator.PipeHandleLeaderChangePlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
@@ -231,6 +232,16 @@ public class PipeTaskInfo implements SnapshotProcessor {
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
+  public TSStatus handleMetaChanges(PipeHandleMetaChangePlan plan) {
+    pipeMetaKeeper.clear();
+    plan.getPipeMetaList()
+        .forEach(
+            pipeMeta -> {
+              pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta);
+            });
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
   /////////////////////////////// Snapshot ///////////////////////////////
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
index e471329c8e6..6531f84e049 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
@@ -26,5 +26,5 @@ public enum PipeTaskOperation {
   DROP_PIPE,
   HANDLE_LEADER_CHANGE,
   SYNC_PIPE_META,
-  ;
+  HANDLE_PIPE_META_CHANGE
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 3fbe3091101..0c218184b5c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.confignode.consensus.request.write.pipe.coordinator.PipeHandleLeaderChangePlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.impl.pipe.task.AbstractOperatePipeProcedureV2;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
new file mode 100644
index 00000000000..db04d909f7d
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.AbstractOperatePipeProcedureV2;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV2 {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeHandleMetaChangeProcedure.class);
+
+  private int dataNodeId;
+  private final List<ByteBuffer> pipeMetaByteBufferListFromDataNode;
+
+  private boolean needWriteConsensusOnConfigNodes = false;
+  private boolean needPushPipeMetaToDataNodes = false;
+
+  public PipeHandleMetaChangeProcedure() {
+    super();
+    pipeMetaByteBufferListFromDataNode = new ArrayList<>();
+  }
+
+  public PipeHandleMetaChangeProcedure(
+      int dataNodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromDataNode) {
+    super();
+    this.dataNodeId = dataNodeId;
+    this.pipeMetaByteBufferListFromDataNode = pipeMetaByteBufferListFromDataNode;
+    needWriteConsensusOnConfigNodes = false;
+    needPushPipeMetaToDataNodes = false;
+  }
+
+  @Override
+  protected PipeTaskOperation getOperation() {
+    return PipeTaskOperation.HANDLE_PIPE_META_CHANGE;
+  }
+
+  @Override
+  protected void executeFromValidateTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info("PipeHandleMetaChangeProcedure: executeFromValidateTask");
+
+    // do nothing
+  }
+
+  @Override
+  protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info("PipeHandleMetaChangeProcedure: executeFromCalculateInfoForTask");
+
+    final Map<PipeStaticMeta, PipeMeta> pipeMetaMapFromDataNode = new HashMap<>();
+    for (ByteBuffer byteBuffer : pipeMetaByteBufferListFromDataNode) {
+      final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer);
+      pipeMetaMapFromDataNode.put(pipeMeta.getStaticMeta(), pipeMeta);
+    }
+
+    for (final PipeMeta pipeMetaOnConfigNode :
+        env.getConfigManager()
+            .getPipeManager()
+            .getPipeTaskCoordinator()
+            .getPipeTaskInfo()
+            .getPipeMetaList()) {
+      final PipeMeta pipeMetaFromDataNode =
+          pipeMetaMapFromDataNode.get(pipeMetaOnConfigNode.getStaticMeta());
+      if (pipeMetaFromDataNode == null) {
+        LOGGER.warn(
+            "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
+                + "pipeMetaFromDataNode is null, pipeMetaOnConfigNode: {}",
+            pipeMetaOnConfigNode);
+        continue;
+      }
+
+      final Map<TConsensusGroupId, PipeTaskMeta> pipeTaskMetaMapOnConfigNode =
+          pipeMetaOnConfigNode.getRuntimeMeta().getConsensusGroupIdToTaskMetaMap();
+      final Map<TConsensusGroupId, PipeTaskMeta> pipeTaskMetaMapFromDataNode =
+          pipeMetaFromDataNode.getRuntimeMeta().getConsensusGroupIdToTaskMetaMap();
+      for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> runtimeMetaOnConfigNode :
+          pipeTaskMetaMapOnConfigNode.entrySet()) {
+        if (runtimeMetaOnConfigNode.getValue().getRegionLeader() != dataNodeId) {
+          continue;
+        }
+
+        final PipeTaskMeta runtimeMetaFromDataNode =
+            pipeTaskMetaMapFromDataNode.get(runtimeMetaOnConfigNode.getKey());
+        if (runtimeMetaFromDataNode == null) {
+          LOGGER.warn(
+              "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
+                  + "runtimeMetaFromDataNode is null, runtimeMetaOnConfigNode: {}",
+              runtimeMetaOnConfigNode);
+          continue;
+        }
+
+        // update progress index
+        if (runtimeMetaOnConfigNode.getValue().getProgressIndex()
+            < runtimeMetaFromDataNode.getProgressIndex()) {
+          runtimeMetaOnConfigNode
+              .getValue()
+              .setProgressIndex(runtimeMetaFromDataNode.getProgressIndex());
+          needWriteConsensusOnConfigNodes = true;
+        }
+
+        // update runtime exception
+        final PipeTaskMeta pipeTaskMetaOnConfigNode = runtimeMetaOnConfigNode.getValue();
+        pipeTaskMetaOnConfigNode.clearExceptionMessages();
+        for (final PipeRuntimeException exception :
+            runtimeMetaFromDataNode.getExceptionMessages()) {
+          pipeTaskMetaOnConfigNode.trackExceptionMessage(exception);
+          if (exception instanceof PipeRuntimeCriticalException) {
+            pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
+            needWriteConsensusOnConfigNodes = true;
+            needPushPipeMetaToDataNodes = true;
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
+    LOGGER.info("PipeHandleMetaChangeProcedure: executeFromWriteConfigNodeConsensus");
+
+    if (!needWriteConsensusOnConfigNodes) {
+      return;
+    }
+
+    final List<PipeMeta> pipeMetaList = new ArrayList<>();
+    for (final PipeMeta pipeMeta :
+        env.getConfigManager()
+            .getPipeManager()
+            .getPipeTaskCoordinator()
+            .getPipeTaskInfo()
+            .getPipeMetaList()) {
+      pipeMetaList.add(pipeMeta);
+    }
+
+    final ConsensusWriteResponse response =
+        env.getConfigManager()
+            .getConsensusManager()
+            .write(new PipeHandleMetaChangePlan(pipeMetaList));
+    if (!response.isSuccessful()) {
+      throw new PipeManagementException(response.getErrorMessage());
+    }
+  }
+
+  @Override
+  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
+    LOGGER.info("PipeHandleMetaChangeProcedure: executeFromHandleOnDataNodes");
+
+    if (!needPushPipeMetaToDataNodes) {
+      return;
+    }
+
+    pushPipeMetaToDataNodes(env);
+  }
+
+  @Override
+  protected void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info("PipeHandleMetaChangeProcedure: rollbackFromValidateTask");
+
+    // do nothing
+  }
+
+  @Override
+  protected void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info("PipeHandleMetaChangeProcedure: rollbackFromCalculateInfoForTask");
+
+    // do nothing
+  }
+
+  @Override
+  protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
+    LOGGER.info("PipeHandleMetaChangeProcedure: rollbackFromWriteConfigNodeConsensus");
+
+    // do nothing
+  }
+
+  @Override
+  protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
+    LOGGER.info("PipeHandleMetaChangeProcedure: rollbackFromOperateOnDataNodes");
+
+    // do nothing
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeShort(ProcedureType.PIPE_HANDLE_META_CHANGE_PROCEDURE.getTypeCode());
+    super.serialize(stream);
+
+    ReadWriteIOUtils.write(dataNodeId, stream);
+
+    ReadWriteIOUtils.write(pipeMetaByteBufferListFromDataNode.size(), stream);
+    for (ByteBuffer pipeMetaByteBuffer : pipeMetaByteBufferListFromDataNode) {
+      ReadWriteIOUtils.write(pipeMetaByteBuffer.limit(), stream);
+      ReadWriteIOUtils.write(new Binary(pipeMetaByteBuffer.array()), stream);
+    }
+
+    ReadWriteIOUtils.write(needWriteConsensusOnConfigNodes, stream);
+    ReadWriteIOUtils.write(needPushPipeMetaToDataNodes, stream);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+
+    dataNodeId = ReadWriteIOUtils.readInt(byteBuffer);
+
+    final int size = ReadWriteIOUtils.readInt(byteBuffer);
+    for (int i = 0; i < size; ++i) {
+      final int limit = ReadWriteIOUtils.readInt(byteBuffer);
+      final ByteBuffer pipeMetaByteBuffer =
+          ByteBuffer.wrap(ReadWriteIOUtils.readBinary(byteBuffer).getValues());
+      pipeMetaByteBuffer.limit(limit);
+      pipeMetaByteBufferListFromDataNode.add(pipeMetaByteBuffer);
+    }
+
+    needWriteConsensusOnConfigNodes = ReadWriteIOUtils.readBool(byteBuffer);
+    needPushPipeMetaToDataNodes = ReadWriteIOUtils.readBool(byteBuffer);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof PipeHandleMetaChangeProcedure)) {
+      return false;
+    }
+    PipeHandleMetaChangeProcedure that = (PipeHandleMetaChangeProcedure) o;
+    return dataNodeId == that.dataNodeId
+        && needWriteConsensusOnConfigNodes == that.needWriteConsensusOnConfigNodes
+        && needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes
+        && Objects.equals(
+            pipeMetaByteBufferListFromDataNode, that.pipeMetaByteBufferListFromDataNode);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        dataNodeId,
+        pipeMetaByteBufferListFromDataNode,
+        needWriteConsensusOnConfigNodes,
+        needPushPipeMetaToDataNodes);
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 333b3e45f74..ab180dc8026 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
@@ -127,6 +128,9 @@ public class ProcedureFactory implements IProcedureFactory {
       case PIPE_META_SYNC_PROCEDURE:
         procedure = new PipeMetaSyncProcedure();
         break;
+      case PIPE_HANDLE_META_CHANGE_PROCEDURE:
+        procedure = new PipeHandleMetaChangeProcedure();
+        break;
       case CREATE_CQ_PROCEDURE:
         procedure =
             new CreateCQProcedure(
@@ -216,6 +220,8 @@ public class ProcedureFactory implements IProcedureFactory {
       return ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE;
     } else if (procedure instanceof PipeMetaSyncProcedure) {
       return ProcedureType.PIPE_META_SYNC_PROCEDURE;
+    } else if (procedure instanceof PipeHandleMetaChangeProcedure) {
+      return ProcedureType.PIPE_HANDLE_META_CHANGE_PROCEDURE;
     }
     return null;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index 39541414d11..12839b5b412 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -74,7 +74,8 @@ public enum ProcedureType {
 
   /** Pipe Runtime */
   PIPE_HANDLE_LEADER_CHANGE_PROCEDURE((short) 1100),
-  PIPE_META_SYNC_PROCEDURE((short) 1101);
+  PIPE_META_SYNC_PROCEDURE((short) 1101),
+  PIPE_HANDLE_META_CHANGE_PROCEDURE((short) 1102);
 
   private final short typeCode;
 
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index ef8fdef3994..9a83e00acd7 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.commons.partition.SeriesPartitionTable;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -91,9 +92,10 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo
 import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
-import org.apache.iotdb.confignode.consensus.request.write.pipe.coordinator.PipeHandleLeaderChangePlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
@@ -1136,6 +1138,46 @@ public class ConfigPhysicalPlanSerDeTest {
         pipeHandleLeaderChangePlan1.getConsensusGroupId2NewDataRegionLeaderIdMap());
   }
 
+  @Test
+  public void pipeHandleMetaChangePlanTest() throws IOException {
+    List<PipeMeta> pipeMetaList = new ArrayList<>();
+    PipeStaticMeta pipeStaticMeta =
+        new PipeStaticMeta(
+            "pipeName",
+            123L,
+            new HashMap() {
+              {
+                put("collector-key", "collector-value");
+              }
+            },
+            new HashMap() {
+              {
+                put("processor-key-1", "processor-value-1");
+                put("processor-key-2", "processor-value-2");
+              }
+            },
+            new HashMap() {});
+    PipeRuntimeMeta pipeRuntimeMeta =
+        new PipeRuntimeMeta(
+            new HashMap() {
+              {
+                put(
+                    new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
+                    new PipeTaskMeta(789, 987));
+                put(
+                    new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
+                    new PipeTaskMeta(456, 789));
+              }
+            });
+    pipeMetaList.add(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta));
+    PipeHandleMetaChangePlan pipeHandleMetaChangePlan1 = new PipeHandleMetaChangePlan(pipeMetaList);
+    PipeHandleMetaChangePlan pipeHandleMetaChangePlan2 =
+        (PipeHandleMetaChangePlan)
+            ConfigPhysicalPlan.Factory.create(pipeHandleMetaChangePlan1.serializeToByteBuffer());
+    Assert.assertEquals(
+        pipeHandleMetaChangePlan1.getPipeMetaList(), pipeHandleMetaChangePlan2.getPipeMetaList());
+  }
+
   @Test
   public void GetTriggerTablePlanTest() throws IOException {
     GetTriggerTablePlan getTriggerTablePlan0 = new GetTriggerTablePlan(true);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index a547fdde521..0246759c98c 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -73,7 +73,6 @@ public class PipeInfoTest {
     processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor");
     connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocal.ThriftTransporter");
     PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(0, 1);
-    pipeTaskMeta.trackException(true, "someError");
     Map<TConsensusGroupId, PipeTaskMeta> pipeTasks = new HashMap<>();
     pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta);
     PipeStaticMeta pipeStaticMeta =
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
new file mode 100644
index 00000000000..07d74e2d9ff
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class PipeHandleMetaChangeProcedureTest {
+
+  @Test
+  public void serializeDeserializeTest() throws IOException {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+
+    PipeStaticMeta pipeStaticMeta =
+        new PipeStaticMeta(
+            "pipeName",
+            123L,
+            new HashMap() {
+              {
+                put("collector-key", "collector-value");
+              }
+            },
+            new HashMap() {
+              {
+                put("processor-key-1", "processor-value-1");
+                put("processor-key-2", "processor-value-2");
+              }
+            },
+            new HashMap() {});
+    PipeRuntimeMeta pipeRuntimeMeta =
+        new PipeRuntimeMeta(
+            new HashMap() {
+              {
+                put(
+                    new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
+                    new PipeTaskMeta(789, 987));
+                put(
+                    new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
+                    new PipeTaskMeta(456, 789));
+              }
+            });
+
+    PipeHandleMetaChangeProcedure proc =
+        new PipeHandleMetaChangeProcedure(
+            123,
+            Collections.singletonList(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta).serialize()));
+
+    try {
+      proc.serialize(outputStream);
+      ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+      PipeHandleMetaChangeProcedure proc2 =
+          (PipeHandleMetaChangeProcedure) ProcedureFactory.getInstance().create(buffer);
+
+      assertEquals(proc, proc2);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index 213c3da3a22..bb5149fb1f2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -63,16 +62,12 @@ public class PipeTaskMeta {
     return exceptionMessages;
   }
 
-  public void mergeExceptionMessages(
-      Collection<? extends PipeRuntimeException> newExceptionMessages) {
-    exceptionMessages.addAll(newExceptionMessages);
+  public void trackExceptionMessage(PipeRuntimeException exceptionMessage) {
+    exceptionMessages.add(exceptionMessage);
   }
 
-  public void trackException(boolean critical, String message) {
-    exceptionMessages.add(
-        critical
-            ? new PipeRuntimeCriticalException(message)
-            : new PipeRuntimeNonCriticalException(message));
+  public void clearExceptionMessages() {
+    exceptionMessages.clear();
   }
 
   public void setProgressIndex(long progressIndex) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index e231d818dbf..2ff33b450c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -31,12 +31,18 @@ import org.apache.iotdb.db.pipe.task.PipeBuilder;
 import org.apache.iotdb.db.pipe.task.PipeTask;
 import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
 import org.apache.iotdb.db.pipe.task.PipeTaskManager;
+import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
+import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.validation.constraints.NotNull;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -430,6 +436,12 @@ public class PipeTaskAgent {
 
     // set pipe meta status to RUNNING
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
+    // clear exception messages if started successfully
+    existedPipeMeta
+        .getRuntimeMeta()
+        .getConsensusGroupIdToTaskMetaMap()
+        .values()
+        .forEach(PipeTaskMeta::clearExceptionMessages);
   }
 
   private void stopPipe(String pipeName, long creationTime) {
@@ -539,4 +551,23 @@ public class PipeTaskAgent {
       pipeTask.stop();
     }
   }
+
+  ///////////////////////// Heartbeat /////////////////////////
+
+  public synchronized void collectPipeMetaList(THeartbeatReq req, THeartbeatResp resp)
+      throws TException {
+    if (!req.isNeedPipeMetaList()) {
+      return;
+    }
+
+    final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
+    try {
+      for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+        pipeMetaBinaryList.add(pipeMeta.serialize());
+      }
+    } catch (IOException e) {
+      throw new TException(e);
+    }
+    resp.setPipeMetaList(pipeMetaBinaryList);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 86feb49b8db..af62026464e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1023,6 +1023,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     }
     // Update schema quota if necessary
     SchemaEngine.getInstance().updateAndFillSchemaCountMap(req.schemaQuotaCount, resp);
+
+    // Update pipe meta if necessary
+    PipeAgent.task().collectPipeMetaList(req, resp);
+
     return resp;
   }
 
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index d467625f485..d519a2e80e4 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -248,6 +248,7 @@ struct THeartbeatReq {
   5: optional list<i32> schemaRegionIds
   6: optional list<i32> dataRegionIds
   7: optional map<string, common.TSpaceQuota> spaceQuotaUsage
+  8: optional bool needPipeMetaList
 }
 
 struct THeartbeatResp {
@@ -261,6 +262,7 @@ struct THeartbeatResp {
   8: optional map<i32, i64> regionDisk
   // TODO: schemaLimitLevel can be removed if confignode support hot load configuration
   9: optional TSchemaLimitLevel schemaLimitLevel
+  10: optional list<binary> pipeMetaList
 }
 
 enum TSchemaLimitLevel{