You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/10/25 06:28:04 UTC

[iotdb] branch master updated: [IOTDB-4678] Sync DataNode start-up process (#7691)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bcb19da0c [IOTDB-4678] Sync DataNode start-up process (#7691)
2bcb19da0c is described below

commit 2bcb19da0cc15c75d4b0b5a5a445a19106123889
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Tue Oct 25 14:27:59 2022 +0800

    [IOTDB-4678] Sync DataNode start-up process (#7691)
---
 .../iotdb/confignode/manager/ConfigManager.java    | 11 ++++++
 .../apache/iotdb/confignode/manager/IManager.java  |  8 ++++
 .../iotdb/confignode/manager/SyncManager.java      | 30 +++++++++++++--
 .../persistence/sync/ClusterSyncInfo.java          | 25 ++++++++++++
 .../impl/sync/AbstractOperatePipeProcedure.java    |  3 ++
 .../procedure/impl/sync/CreatePipeProcedure.java   |  1 +
 .../procedure/impl/sync/DropPipeProcedure.java     |  5 ++-
 .../procedure/impl/sync/StartPipeProcedure.java    |  4 +-
 .../procedure/impl/sync/StopPipeProcedure.java     |  4 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  6 +++
 .../apache/iotdb/db/client/ConfigNodeClient.java   | 17 ++++++++
 .../java/org/apache/iotdb/db/sync/SyncService.java | 45 ++++++++++++++++------
 .../db/sync/common/ClusterSyncInfoFetcher.java     | 14 ++++++-
 .../src/main/thrift/confignode.thrift              |  8 ++++
 14 files changed, 163 insertions(+), 18 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 013494cb46..a074cdcd81 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -96,6 +96,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
@@ -1155,6 +1156,16 @@ public class ConfigManager implements IManager {
     }
   }
 
+  @Override
+  public TGetAllPipeInfoResp getAllPipeInfo() {
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return syncManager.getAllPipeInfo();
+    } else {
+      return new TGetAllPipeInfoResp().setStatus(status);
+    }
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(GetRegionIdPlan plan) {
     TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index ae682da23d..3c1eb022df 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
@@ -474,6 +475,13 @@ public interface IManager {
    */
   TShowPipeResp showPipe(TShowPipeReq req);
 
+  /**
+   * Get all pipe information. It is used for DataNode registration and restart.
+   *
+   * @return All pipe information.
+   */
+  TGetAllPipeInfoResp getAllPipeInfo();
+
   TGetRegionIdResp getRegionId(GetRegionIdPlan plan);
 
   TGetTimeSlotListResp getTimeSlotList(GetTimeSlotListPlan plan);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
index 1670e602c7..9e929243fc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.sync.pipe.PipeInfo;
 import org.apache.iotdb.commons.sync.pipe.PipeStatus;
 import org.apache.iotdb.commons.sync.pipe.SyncOperation;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
@@ -41,6 +42,7 @@ import org.apache.iotdb.confignode.consensus.response.PipeResp;
 import org.apache.iotdb.confignode.consensus.response.PipeSinkResp;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
@@ -68,6 +70,14 @@ public class SyncManager {
     this.clusterSyncInfo = clusterSyncInfo;
   }
 
+  public void lockSyncMetadata() {
+    clusterSyncInfo.lockSyncMetadata();
+  }
+
+  public void unlockSyncMetadata() {
+    clusterSyncInfo.unlockSyncMetadata();
+  }
+
   // ======================================================
   // region Implement of PipeSink
   // ======================================================
@@ -148,6 +158,22 @@ public class SyncManager {
     return clusterSyncInfo.getPipeInfo(pipeName);
   }
 
+  public TGetAllPipeInfoResp getAllPipeInfo() {
+    try {
+      // Should lock SyncMetadata to block operation PIPE procedure
+      lockSyncMetadata();
+      TGetAllPipeInfoResp resp = new TGetAllPipeInfoResp();
+      resp.setStatus(StatusUtils.OK);
+      resp.setAllPipeInfo(
+          clusterSyncInfo.getAllPipeInfos().stream()
+              .map(PipeInfo::serializeToByteBuffer)
+              .collect(Collectors.toList()));
+      return resp;
+    } finally {
+      unlockSyncMetadata();
+    }
+  }
+
   /**
    * Broadcast DataNodes to operate PIPE operation.
    *
@@ -160,8 +186,6 @@ public class SyncManager {
     NodeManager nodeManager = configManager.getNodeManager();
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         nodeManager.getRegisteredDataNodeLocations();
-    final List<TSStatus> dataNodeResponseStatus =
-        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
     final TOperatePipeOnDataNodeReq request =
         new TOperatePipeOnDataNodeReq(pipeName, (byte) operation.ordinal());
 
@@ -169,7 +193,7 @@ public class SyncManager {
         new AsyncClientHandler<>(DataNodeRequestType.OPERATE_PIPE, request, dataNodeLocationMap);
     AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
 
-    return dataNodeResponseStatus;
+    return clientHandler.getResponseList();
   }
 
   /**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
index 4c4a8b6e2b..b2805d658b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
@@ -47,6 +47,8 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class ClusterSyncInfo implements SnapshotProcessor {
 
@@ -54,6 +56,8 @@ public class ClusterSyncInfo implements SnapshotProcessor {
 
   private final SyncMetadata syncMetadata;
 
+  private final ReentrantLock syncMetadataLock = new ReentrantLock();
+
   public ClusterSyncInfo() {
     syncMetadata = new SyncMetadata();
   }
@@ -169,6 +173,27 @@ public class ClusterSyncInfo implements SnapshotProcessor {
     return pipeInfo;
   }
 
+  public List<PipeInfo> getAllPipeInfos() {
+    return syncMetadata.getAllPipeInfos();
+  }
+
+  // endregion
+
+  // ======================================================
+  // region Implement of Snapshot
+  // ======================================================
+
+  public void lockSyncMetadata() {
+    LOGGER.info("Lock SyncMetadata");
+    syncMetadataLock.lock();
+    LOGGER.info("Acquire SyncMetadata lock");
+  }
+
+  public void unlockSyncMetadata() {
+    LOGGER.info("Unlock SyncMetadata");
+    syncMetadataLock.unlock();
+  }
+
   // endregion
 
   // ======================================================
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/AbstractOperatePipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/AbstractOperatePipeProcedure.java
index af42694106..7af1093843 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/AbstractOperatePipeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/AbstractOperatePipeProcedure.java
@@ -65,7 +65,9 @@ abstract class AbstractOperatePipeProcedure
     try {
       switch (state) {
         case OPERATE_CHECK:
+          env.getConfigManager().getSyncManager().lockSyncMetadata();
           if (executeCheckCanSkip(env)) {
+            env.getConfigManager().getSyncManager().unlockSyncMetadata();
             return Flow.NO_MORE_STATE;
           }
           setNextState(OperatePipeState.PRE_OPERATE_PIPE_CONFIGNODE);
@@ -80,6 +82,7 @@ abstract class AbstractOperatePipeProcedure
           break;
         case OPERATE_PIPE_CONFIGNODE:
           executeOperatePipeOnConfigNode(env);
+          env.getConfigManager().getSyncManager().unlockSyncMetadata();
           return Flow.NO_MORE_STATE;
       }
     } catch (PipeException | PipeSinkException e) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
index fdaa1a2a13..d1b966bd90 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
@@ -116,6 +116,7 @@ public class CreatePipeProcedure extends AbstractOperatePipeProcedure {
   protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
       throws IOException, InterruptedException, ProcedureException {
     LOGGER.error("Roll back CreatePipeProcedure at STATE [{}]", state);
+    env.getConfigManager().getSyncManager().unlockSyncMetadata();
     // TODO(sync): roll back logic;
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java
index 5bbd6e5da5..6f9425a9cf 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java
@@ -107,7 +107,10 @@ public class DropPipeProcedure extends AbstractOperatePipeProcedure {
 
   @Override
   protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
-      throws IOException, InterruptedException, ProcedureException {}
+      throws IOException, InterruptedException, ProcedureException {
+
+    env.getConfigManager().getSyncManager().unlockSyncMetadata();
+  }
 
   @Override
   public void serialize(DataOutputStream stream) throws IOException {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
index 7061443503..d455b2ab0c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
@@ -102,7 +102,9 @@ public class StartPipeProcedure extends AbstractOperatePipeProcedure {
 
   @Override
   protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
-      throws IOException, InterruptedException, ProcedureException {}
+      throws IOException, InterruptedException, ProcedureException {
+    env.getConfigManager().getSyncManager().unlockSyncMetadata();
+  }
 
   @Override
   public void serialize(DataOutputStream stream) throws IOException {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java
index 086811f523..de355b7829 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java
@@ -102,7 +102,9 @@ public class StopPipeProcedure extends AbstractOperatePipeProcedure {
 
   @Override
   protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
-      throws IOException, InterruptedException, ProcedureException {}
+      throws IOException, InterruptedException, ProcedureException {
+    env.getConfigManager().getSyncManager().unlockSyncMetadata();
+  }
 
   @Override
   public void serialize(DataOutputStream stream) throws IOException {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 3269a1252f..5dec58d8d0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -91,6 +91,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
@@ -675,6 +676,11 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     return configManager.showPipe(req);
   }
 
+  @Override
+  public TGetAllPipeInfoResp getAllPipeInfo() throws TException {
+    return configManager.getAllPipeInfo();
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
     if (req.isSetTimeSlotId() && req.getType() != TConsensusGroupType.DataRegion) {
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index f08e3ba926..b0738b5f1d 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -58,6 +58,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
@@ -1226,6 +1227,22 @@ public class ConfigNodeClient
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
+  @Override
+  public TGetAllPipeInfoResp getAllPipeInfo() throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TGetAllPipeInfoResp resp = client.getAllPipeInfo();
+        if (!updateConfigNodeLeader(resp.getStatus())) {
+          return resp;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      reconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index 64237e15d5..372ba5e1c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.commons.sync.pipe.PipeStatus;
 import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.commons.sync.utils.SyncConstant;
-import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -70,7 +69,6 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -253,7 +251,12 @@ public class SyncService implements IService {
 
   public synchronized void dropPipe(String pipeName) throws PipeException {
     logger.info("Execute drop PIPE {}", pipeName);
-    Pipe runningPipe = getPipe(pipeName);
+    Pipe runningPipe;
+    try {
+      runningPipe = getPipe(pipeName);
+    } catch (PipeNotExistException e) {
+      return;
+    }
     if (runningPipe.getPipeSink().getType() != PipeSink.PipeSinkType.IoTDB) { // for external pipe
       // == drop ExternalPipeProcessor
       if (extPipePluginManagers.containsKey(pipeName)) {
@@ -476,14 +479,11 @@ public class SyncService implements IService {
         extPipePluginRegister.getAllPluginName().size(),
         extPipePluginRegister.getAllPluginName());
 
-    File senderLog = new File(SyncPathUtil.getSysDir(), SyncConstant.SYNC_LOG_NAME);
-    if (senderLog.exists()) {
-      try {
-        recover();
-      } catch (Exception e) {
-        logger.error("Recover from disk error.", e);
-        throw new StartupException(e);
-      }
+    try {
+      recover();
+    } catch (Exception e) {
+      logger.error("Recover from disk error.", e);
+      throw new StartupException(e);
     }
   }
 
@@ -518,9 +518,27 @@ public class SyncService implements IService {
     return ServiceType.SYNC_SERVICE;
   }
 
+  /**
+   * If run on standalone version, recover from disk.
+   *
+   * <p>If run on MPP version, init or recover from ConfigNode.
+   */
   private void recover() throws IOException, PipeException, PipeSinkException {
     List<PipeInfo> allPipeInfos = syncInfoFetcher.getAllPipeInfos();
     for (PipeInfo pipeInfo : allPipeInfos) {
+      logger.info(
+          "Recover PIPE [{}] whose status is {}",
+          pipeInfo.getPipeName(),
+          pipeInfo.getStatus().name());
+      if (PipeStatus.PREPARE_CREATE.equals(pipeInfo.getStatus())
+          || PipeStatus.PREPARE_DROP.equals(pipeInfo.getStatus())) {
+        // skip
+        logger.info(
+            "Skip PIPE [{}] because its status is {}",
+            pipeInfo.getPipeName(),
+            pipeInfo.getStatus().name());
+        continue;
+      }
       Pipe pipe =
           SyncPipeUtil.parsePipeInfoAsPipe(
               pipeInfo, syncInfoFetcher.getPipeSink(pipeInfo.getPipeSinkName()));
@@ -530,8 +548,13 @@ public class SyncService implements IService {
           pipe.start();
           break;
         case STOP:
+        case PREPARE_START:
+        case PREPARE_STOP:
           pipe.stop();
           break;
+        case PREPARE_CREATE:
+        case PREPARE_DROP:
+          throw new PipeException("Unexpected status " + pipeInfo.getStatus().name());
         default:
           throw new IOException(
               String.format("Can not recognize running pipe status %s.", pipe.getStatus()));
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
index 9906e2fb70..8021d9815c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.exception.sync.PipeSinkException;
 import org.apache.iotdb.commons.sync.pipe.PipeInfo;
 import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
@@ -38,7 +39,9 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /** Only fetch read request. For write request, return SUCCESS directly. */
 public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
@@ -115,7 +118,16 @@ public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
 
   @Override
   public List<PipeInfo> getAllPipeInfos() {
-    throw new UnsupportedOperationException();
+    try (ConfigNodeClient configNodeClient =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+      TGetAllPipeInfoResp resp = configNodeClient.getAllPipeInfo();
+      return resp.getAllPipeInfo().stream()
+          .map(PipeInfo::deserializePipeInfo)
+          .collect(Collectors.toList());
+    } catch (Exception e) {
+      LOGGER.error("Get AllPipeInfos error because {}", e.getMessage(), e);
+      return Collections.emptyList();
+    }
   }
 
   @Override
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 503341ae05..c95f6a8637 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -486,6 +486,11 @@ struct TShowPipeInfo {
   6: required string message
 }
 
+struct TGetAllPipeInfoResp{
+  1: required common.TSStatus status
+  2: optional list<binary> allPipeInfo
+}
+
 struct TCreatePipeReq {
     1: required string pipeName
     2: required string pipeSinkName
@@ -914,6 +919,9 @@ service IConfigNodeRPCService {
   /** Show Pipe by name, if name is empty, show all Pipe */
   TShowPipeResp showPipe(TShowPipeReq req)
 
+  /* Get all pipe information. It is used for DataNode registration and restart*/
+  TGetAllPipeInfoResp getAllPipeInfo();
+
   // ======================================================
   // TestTools
   // ======================================================