You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/11/25 16:59:09 UTC

[iotdb] branch master updated: [IOTDB-4970] Enhanced sync safety management (#8028)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b59ce4faa7 [IOTDB-4970] Enhanced sync safety management (#8028)
b59ce4faa7 is described below

commit b59ce4faa76c2618c0dc8b6aacbc9dd4bb52f45a
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Sat Nov 26 00:59:03 2022 +0800

    [IOTDB-4970] Enhanced sync safety management (#8028)
---
 .../consensus/request/ConfigPhysicalPlan.java      |  4 ++
 .../consensus/request/ConfigPhysicalPlanType.java  |  1 +
 .../request/write/sync/RecordPipeMessagePlan.java  | 65 ++++++++++++++++++++++
 .../iotdb/confignode/manager/ConfigManager.java    | 13 +++++
 .../apache/iotdb/confignode/manager/IManager.java  |  8 +++
 .../iotdb/confignode/manager/SyncManager.java      | 12 ++++
 .../persistence/executor/ConfigPlanExecutor.java   |  3 +
 .../persistence/sync/ClusterSyncInfo.java          |  9 ++-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  6 ++
 .../Edge-Cloud-Collaboration/Sync-Tool.md          | 21 +++----
 .../Edge-Cloud-Collaboration/Sync-Tool.md          | 13 +++--
 .../db/integration/sync/IoTDBSyncReceiverIT.java   |  2 +-
 .../sync/IoTDBSyncReceiverLoaderIT.java            |  6 +-
 .../iotdb/db/integration/sync/MockSyncClient.java  |  4 +-
 .../resources/conf/iotdb-common.properties         |  6 +-
 .../exception/sync}/SyncConnectionException.java   |  2 +-
 .../exception/sync/SyncHandshakeException.java     | 34 ++---------
 .../iotdb/commons/sync/metadata/SyncMetadata.java  |  3 +
 .../iotdb/commons/sync/pipe/PipeMessage.java       | 54 +++++++++++++++---
 .../commons/sync/transport/SyncIdentityInfo.java   | 57 +++++++++++++++++++
 .../iotdb/commons/sync/utils/SyncPathUtil.java     | 11 +---
 .../apache/iotdb/db/client/ConfigNodeClient.java   | 17 ++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../control/clientsession/IClientSession.java      |  2 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  |  7 ++-
 .../db/service/thrift/impl/TSServiceImpl.java      |  3 +-
 .../java/org/apache/iotdb/db/sync/SyncService.java | 24 ++++----
 .../db/sync/common/ClusterSyncInfoFetcher.java     | 12 +++-
 .../iotdb/db/sync/pipedata/DeletionPipeData.java   |  4 +-
 .../apache/iotdb/db/sync/pipedata/PipeData.java    |  2 +-
 .../iotdb/db/sync/pipedata/TsFilePipeData.java     |  4 +-
 .../load/DeletionLoader.java                       |  2 +-
 .../sync/{receiver => pipedata}/load/ILoader.java  |  2 +-
 .../{receiver => pipedata}/load/TsFileLoader.java  |  2 +-
 .../iotdb/db/sync/sender/pipe/TsFilePipe.java      |  2 +
 .../db/sync/transport/client/ISyncClient.java      |  7 ++-
 .../db/sync/transport/client/IoTDBSyncClient.java  | 34 +++++------
 .../db/sync/transport/client/SenderManager.java    | 49 +++++++++++-----
 .../sync/transport/client/SyncClientFactory.java   | 34 +----------
 .../db/sync/transport/server/ReceiverManager.java  | 54 ++++++++++--------
 .../src/main/thrift/confignode.thrift              |  8 +++
 thrift/src/main/thrift/client.thrift               | 10 ++--
 42 files changed, 417 insertions(+), 198 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index e1b8e58c6d..c09b2f7ade 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -77,6 +77,7 @@ import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
 import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
@@ -340,6 +341,9 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
         case ShowPipe:
           plan = new ShowPipePlan();
           break;
+        case RecordPipeMessage:
+          plan = new RecordPipeMessagePlan();
+          break;
         case GetRegionId:
           plan = new GetRegionIdPlan();
           break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index 4cea224021..3884beb5c4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -119,6 +119,7 @@ public enum ConfigPhysicalPlanType {
   SetPipeStatus((short) 904),
   DropPipe((short) 905),
   ShowPipe((short) 906),
+  RecordPipeMessage((short) 907),
 
   /** Trigger */
   AddTriggerInTable((short) 1000),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java
new file mode 100644
index 0000000000..d29aa92a97
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.consensus.request.write.sync;
+
+import org.apache.iotdb.commons.sync.pipe.PipeMessage;
+import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class RecordPipeMessagePlan extends ConfigPhysicalPlan {
+
+  private String pipeName;
+  private PipeMessage pipeMessage;
+
+  public RecordPipeMessagePlan() {
+    super(ConfigPhysicalPlanType.RecordPipeMessage);
+  }
+
+  public RecordPipeMessagePlan(String pipeName, PipeMessage pipeMessage) {
+    this();
+    this.pipeName = pipeName;
+    this.pipeMessage = pipeMessage;
+  }
+
+  public String getPipeName() {
+    return pipeName;
+  }
+
+  public PipeMessage getPipeMessage() {
+    return pipeMessage;
+  }
+
+  @Override
+  protected void serializeImpl(DataOutputStream stream) throws IOException {
+    stream.writeShort(getType().getPlanType());
+    BasicStructureSerDeUtil.write(pipeName, stream);
+    pipeMessage.serialize(stream);
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+    pipeName = BasicStructureSerDeUtil.readString(buffer);
+    pipeMessage = PipeMessage.deserialize(buffer);
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 28d5db84b8..e521529831 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.utils.AuthUtils;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.commons.utils.StatusUtils;
@@ -117,6 +118,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -1280,6 +1282,17 @@ public class ConfigManager implements IManager {
     }
   }
 
+  @Override
+  public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return syncManager.recordPipeMessage(
+          req.getPipeName(), PipeMessage.deserialize(ByteBuffer.wrap(req.getMessage())));
+    } else {
+      return status;
+    }
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(GetRegionIdPlan plan) {
     TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index ac77ab2954..b20a7f3c24 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -76,6 +76,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -519,6 +520,13 @@ public interface IManager {
    */
   TGetAllPipeInfoResp getAllPipeInfo();
 
+  /**
+   * Record PipeMessage
+   *
+   * @return TSStatus
+   */
+  TSStatus recordPipeMessage(TRecordPipeMessageReq req);
+
   TGetRegionIdResp getRegionId(GetRegionIdPlan plan);
 
   TGetTimeSlotListResp getTimeSlotList(GetTimeSlotListPlan plan);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
index 326ad323c3..6bd74531ae 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.exception.sync.PipeException;
 import org.apache.iotdb.commons.exception.sync.PipeSinkException;
 import org.apache.iotdb.commons.exception.sync.PipeSinkNotExistException;
 import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.sync.pipe.PipeStatus;
 import org.apache.iotdb.commons.sync.pipe.SyncOperation;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
@@ -37,6 +38,7 @@ import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
 import org.apache.iotdb.confignode.consensus.response.PipeResp;
@@ -149,6 +151,12 @@ public class SyncManager {
     return getConsensusManager().write(new DropPipePlan(pipeName)).getStatus();
   }
 
+  public TSStatus recordPipeMessage(String pipeName, PipeMessage pipeMessage) {
+    return getConsensusManager()
+        .write(new RecordPipeMessagePlan(pipeName, pipeMessage))
+        .getStatus();
+  }
+
   public TShowPipeResp showPipe(String pipeName) {
     ShowPipePlan showPipePlan = new ShowPipePlan(pipeName);
     PipeResp pipeResp = (PipeResp) getConsensusManager().read(showPipePlan).getDataset();
@@ -283,4 +291,8 @@ public class SyncManager {
   private ConsensusManager getConsensusManager() {
     return configManager.getConsensusManager();
   }
+
+  private ProcedureManager getProcedureManager() {
+    return configManager.getProcedureManager();
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index ea702afd1d..cda0870cfb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -75,6 +75,7 @@ import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
 import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
@@ -357,6 +358,8 @@ public class ConfigPlanExecutor {
         return syncInfo.setPipeStatus((SetPipeStatusPlan) physicalPlan);
       case DropPipe:
         return syncInfo.dropPipe((DropPipePlan) physicalPlan);
+      case RecordPipeMessage:
+        return syncInfo.recordPipeMessage((RecordPipeMessagePlan) physicalPlan);
       case ADD_CQ:
         return cqInfo.addCQ((AddCQPlan) physicalPlan);
       case DROP_CQ:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
index 4472bbdb92..beb0e0e18f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
 import org.apache.iotdb.confignode.consensus.response.PipeResp;
@@ -150,6 +151,12 @@ public class ClusterSyncInfo implements SnapshotProcessor {
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
   }
 
+  public TSStatus recordPipeMessage(RecordPipeMessagePlan physicalPlan) {
+    syncMetadata.changePipeMessage(
+        physicalPlan.getPipeName(), physicalPlan.getPipeMessage().getType());
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
   public PipeResp showPipe(ShowPipePlan plan) {
     PipeResp resp = new PipeResp();
     if (StringUtils.isEmpty(plan.getPipeName())) {
@@ -184,7 +191,7 @@ public class ClusterSyncInfo implements SnapshotProcessor {
   // endregion
 
   // ======================================================
-  // region Implement of Snapshot
+  // region Implement of Lock and Unlock
   // ======================================================
 
   public void lockSyncMetadata() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 97b4d9fd2e..93a1e12164 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -114,6 +114,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -721,6 +722,11 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     return configManager.getAllPipeInfo();
   }
 
+  @Override
+  public TSStatus recordPipeMessage(TRecordPipeMessageReq req) throws TException {
+    return configManager.recordPipeMessage(req);
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
     if (req.isSetTimeSlotId() && req.getType() != TConsensusGroupType.DataRegion) {
diff --git a/docs/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md b/docs/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
index 516256835a..e2a0162663 100644
--- a/docs/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
+++ b/docs/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
@@ -121,9 +121,9 @@ All parameters are in `$IOTDB_ HOME$/conf/iotdb-common.properties`, after all mo
 
 | **Parameter Name** | **ip_white_list**                                            |
 | ------------------ | ------------------------------------------------------------ |
-| Description        | Set the white list of IP addresses of the sending end of the synchronization, which is expressed in the form of network segments, and multiple network segments are separated by commas. When the sender synchronizes data to the receiver, the receiver allows synchronization only when the IP address of the sender is within the network segment set in the white list. If the whitelist is empty, the receiver does not allow any sender to synchronize data. By default, the re [...]
+| Description        | Set the white list of IP addresses of the sender of the synchronization, which is expressed in the form of network segments, and multiple network segments are separated by commas. When the sender synchronizes data to the receiver, the receiver allows synchronization only when the IP address of the sender is within the network segment set in the white list. If the whitelist is empty, the receiver does not allow any sender to synchronize data. By default, the receive [...]
 | Data type          | String                                                       |
-| Default value      | 0.0.0.0/0                                                    |
+| Default value      | 127.0.0.1/32                                                    |
 
 ## 6.SQL
 
@@ -231,9 +231,10 @@ IoTDB> DROP PIPE <PipeName>
     - When role is sender, the value of this field is the synchronization start time of the Pipe and whether to synchronize the delete operation.
     - When role is receiver, the value of this field is the name of the database corresponding to the synchronization connection created on this DataNode.
 
-  - `message`: the status message of this pipe. When pipe runs normally, this column is usually empty. When an exception occurs, messages may appear in  following two states.
+  - `message`: the status message of this pipe. When pipe runs normally, this column is NORMAL. When an exception occurs, messages may appear in  following two states.
     - WARN, this indicates that a data loss or other error has occurred, but the pipe will remain running.
-    - ERROR, this indicates that the network is interrupted for a long time or there is a problem at the receiving end. The pipe is stopped and set to STOP state.
+    - ERROR, This indicates a problem where the network connection works but the data cannot be transferred, for example, the IP of the sender is not in the whitelist of the receiver or the version of the sender is not compatible with that of the receiver.
+    - When the ERROR status appears, it is recommended to check the DataNode logs after STOP PIPE, check the receiver configuration or network conditions, and then START PIPE again.
 
 
 ```
@@ -242,9 +243,9 @@ IoTDB>
 +-----------------------+--------+--------+-------------+---------+------------------------------------+-------+
 |            create time|   name |    role|       remote|   status|                          attributes|message|
 +-----------------------+--------+--------+-------------+---------+------------------------------------+-------+
-|2022-03-30T20:58:30.689|my_pipe1|  sender|  my_pipesink|     STOP|SyncDelOp=false,DataStartTimestamp=0|       |
+|2022-03-30T20:58:30.689|my_pipe1|  sender|  my_pipesink|     STOP|SyncDelOp=false,DataStartTimestamp=0| NORMAL|
 +-----------------------+--------+--------+-------------+---------+------------------------------------+-------+ 
-|2022-03-31T12:55:28.129|my_pipe2|receiver|192.168.11.11|  RUNNING|             Database='root.vehicle'|       |
+|2022-03-31T12:55:28.129|my_pipe2|receiver|192.168.11.11|  RUNNING|             Database='root.vehicle'| NORMAL|
 +-----------------------+--------+--------+-------------+---------+------------------------------------+-------+
 ```
 
@@ -265,18 +266,18 @@ IoTDB> SHOW PIPE [PipeName]
 
 ### Receiver
 
-- `vi conf/iotdb-common.properties`  to config the parameters,set the IP white list to 192.168.0.1/1 to receive and only receive data from sender.
+- `vi conf/iotdb-common.properties`  to config the parameters,set the IP white list to 192.168.0.1/32 to receive and only receive data from sender.
 
 ```
 ####################
 ### PIPE Server Configuration
 ####################
 # White IP list of Sync client.
-# Please use the form of network segment to present the range of IP, for example: 192.168.0.0/16
+# Please use the form of IPv4 network segment to present the range of IP, for example: 192.168.0.0/16
 # If there are more than one IP segment, please separate them by commas
-# The default is to allow all IP to sync
+# The default is to reject all IP to sync except 0.0.0.0
 # Datatype: String
-ip_white_list=192.168.0.1/1
+ip_white_list=192.168.0.1/32
 ```
 
 ### Sender
diff --git a/docs/zh/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md b/docs/zh/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
index 4431760979..c269441b21 100644
--- a/docs/zh/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
+++ b/docs/zh/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
@@ -114,9 +114,9 @@ IoTDB> DROP PIPE my_pipe
 
 | **参数名** | **ip_white_list**                                            |
 | ---------- | ------------------------------------------------------------ |
-| 描述       | 设置同步功能发送端 IP 地址的白名单,以网段的形式表示,多个网段之间用逗号分隔。发送端向接收端同步数据时,只有当该发送端 IP 地址处于该白名单设置的网段范围内,接收端才允许同步操作。如果白名单为空,则接收端不允许任何发送端同步数据。默认接收端接受全部 IP 的同步请求。 对该参数进行配置时,需要保证发送端所有 DataNode 地址均被覆盖。 |
+| 描述       | 设置同步功能发送端 IP 地址的白名单,以网段的形式表示,多个网段之间用逗号分隔。发送端向接收端同步数据时,只有当该发送端 IP 地址处于该白名单设置的网段范围内,接收端才允许同步操作。如果白名单为空,则接收端不允许任何发送端同步数据。默认接收端拒绝除了本地以外的全部 IP 的同步请求。 对该参数进行配置时,需要保证发送端所有 DataNode 地址均被覆盖。 |
 | 类型       | String                                                       |
-| 默认值     | 0.0.0.0/0                                                    |
+| 默认值     | 127.0.0.1/32                                                    |
 
 ## 6.SQL
 
@@ -222,9 +222,10 @@ IoTDB> DROP PIPE <PipeName>
     - 当 role 为 sender 时,这一字段值为 Pipe 的同步起始时间和是否同步删除操作
     - 当 role 为 receiver 时,这一字段值为当前 DataNode 上创建的同步连接对应的存储组名称
 
-  - `message`:Pipe运行信息,当 Pipe 正常运行时,这一字段通常为空,当出现异常时,可能出现两种状态:
+  - `message`:Pipe运行信息,当 Pipe 正常运行时,这一字段通常为NORMAL,当出现异常时,可能出现两种状态:
     - WARN 状态,这表明发生了数据丢失或者其他错误,但是 Pipe 会保持运行
-    - ERROR 状态,这表明发生了网络长时间中断或者接收端出现问题,Pipe 被停止,置为 STOP 状态
+    - ERROR 状态,这表明出现了网络连接正常但数据无法传输的问题,例如发送端 IP 不在接收端白名单中,或是发送端与接收端版本不兼容
+    - 当出现 ERROR 状态时,建议 STOP PIPE 后查看 DataNode 日志,检查接收端配置或网络情况后重新 START PIPE
 
 ```Plain%20Text
 IoTDB> SHOW PIPES
@@ -232,9 +233,9 @@ IoTDB>
 +-----------------------+--------+--------+-------------+---------+------------------------------------+-------+
 |            create time|   name |    role|       remote|   status|                          attributes|message|
 +-----------------------+--------+--------+-------------+---------+------------------------------------+-------+
-|2022-03-30T20:58:30.689|my_pipe1|  sender|  my_pipesink|     STOP|SyncDelOp=false,DataStartTimestamp=0|       |
+|2022-03-30T20:58:30.689|my_pipe1|  sender|  my_pipesink|     STOP|SyncDelOp=false,DataStartTimestamp=0| NORMAL|
 +-----------------------+--------+--------+-------------+---------+------------------------------------+-------+ 
-|2022-03-31T12:55:28.129|my_pipe2|receiver|192.168.11.11|  RUNNING|             Database='root.vehicle'|       |
+|2022-03-31T12:55:28.129|my_pipe2|receiver|192.168.11.11|  RUNNING|             Database='root.vehicle'| NORMAL|
 +-----------------------+--------+--------+-------------+---------+------------------------------------+-------+
 ```
 
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
index 876dc775ff..c60e62d891 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
@@ -83,7 +83,7 @@ public class IoTDBSyncReceiverIT {
     EnvironmentUtils.envSetUp();
     Pipe pipe = new TsFilePipe(createdTime1, pipeName1, new IoTDBPipeSink("sink"), 0, false);
     remoteIp1 = "127.0.0.1";
-    client = new IoTDBSyncClient(pipe, remoteIp1, 6667, "127.0.0.1", "root.vehicle");
+    client = new IoTDBSyncClient(pipe, remoteIp1, 6667, "root.vehicle");
     client.handshake();
   }
 
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
index b2bf607184..f907a03d89 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.db.integration.sync;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.sync.receiver.load.DeletionLoader;
-import org.apache.iotdb.db.sync.receiver.load.ILoader;
-import org.apache.iotdb.db.sync.receiver.load.TsFileLoader;
+import org.apache.iotdb.db.sync.pipedata.load.DeletionLoader;
+import org.apache.iotdb.db.sync.pipedata.load.ILoader;
+import org.apache.iotdb.db.sync.pipedata.load.TsFileLoader;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.itbase.category.LocalStandaloneTest;
 
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/MockSyncClient.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/MockSyncClient.java
index 5df172b604..2af5596e1f 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/MockSyncClient.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/MockSyncClient.java
@@ -37,9 +37,7 @@ public class MockSyncClient implements ISyncClient {
   }
 
   @Override
-  public boolean handshake() {
-    return true;
-  }
+  public void handshake() {}
 
   @Override
   public boolean send(PipeData pipeData) {
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index d582b485d5..104c74c2da 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -884,11 +884,11 @@
 ####################
 
 # White IP list of Sync client.
-# Please use the form of network segment to present the range of IP, for example: 192.168.0.0/16
+# Please use the form of IPv4 network segment to present the range of IP, for example: 192.168.0.0/16
 # If there are more than one IP segment, please separate them by commas
-# The default is to allow all IP to sync
+# The default is to reject all IP to sync except 127.0.0.1
 # Datatype: String
-# ip_white_list=0.0.0.0/0
+# ip_white_list=127.0.0.1/32
 
 # The maximum number of retry when syncing a file to receiver fails.
 # max_number_of_sync_file_retry=5
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/SyncConnectionException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/SyncConnectionException.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/exception/SyncConnectionException.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/SyncConnectionException.java
index eec90c83af..7a88fa1a8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/SyncConnectionException.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/SyncConnectionException.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.exception;
+package org.apache.iotdb.commons.exception.sync;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.rpc.TSStatusCode;
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/MockSyncClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/SyncHandshakeException.java
similarity index 55%
copy from integration/src/test/java/org/apache/iotdb/db/integration/sync/MockSyncClient.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/SyncHandshakeException.java
index 5df172b604..61903d69ad 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/MockSyncClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/SyncHandshakeException.java
@@ -16,37 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.integration.sync;
+package org.apache.iotdb.commons.exception.sync;
 
-import org.apache.iotdb.db.sync.pipedata.PipeData;
-import org.apache.iotdb.db.sync.transport.client.ISyncClient;
+public class SyncHandshakeException extends SyncConnectionException {
 
-import java.util.ArrayList;
-import java.util.List;
-
-public class MockSyncClient implements ISyncClient {
-
-  private final List<PipeData> pipeDataList;
-
-  public MockSyncClient() {
-    this.pipeDataList = new ArrayList<>();
-  }
-
-  public List<PipeData> getPipeDataList() {
-    return pipeDataList;
+  public SyncHandshakeException(String message) {
+    super(message);
   }
-
-  @Override
-  public boolean handshake() {
-    return true;
-  }
-
-  @Override
-  public boolean send(PipeData pipeData) {
-    pipeDataList.add(pipeData);
-    return true;
-  }
-
-  @Override
-  public void close() {}
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
index ef6591cc0d..97c9a2885c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
@@ -157,6 +157,9 @@ public class SyncMetadata implements SnapshotProcessor {
 
   public void setPipeStatus(String pipeName, PipeStatus status) {
     pipes.get(pipeName).setStatus(status);
+    if (status.equals(PipeStatus.RUNNING)) {
+      pipes.get(pipeName).setMessageType(PipeMessage.PipeMessageType.NORMAL);
+    }
   }
 
   public PipeInfo getPipeInfo(String pipeName) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeMessage.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeMessage.java
index 220c1b1026..b0f636fa56 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeMessage.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeMessage.java
@@ -18,6 +18,13 @@
  */
 package org.apache.iotdb.commons.sync.pipe;
 
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 public class PipeMessage {
   private final String message;
   private final PipeMessageType type;
@@ -35,24 +42,55 @@ public class PipeMessage {
     return type;
   }
 
+  public ByteBuffer serializeToByteBuffer() throws IOException {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+    serialize(dataOutputStream);
+    return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+  }
+
+  public void serialize(DataOutputStream dataOutputStream) throws IOException {
+    ReadWriteIOUtils.write(message, dataOutputStream);
+    ReadWriteIOUtils.write(type.getType(), dataOutputStream);
+  }
+
+  public static PipeMessage deserialize(ByteBuffer buffer) {
+    String message = ReadWriteIOUtils.readString(buffer);
+    PipeMessageType type = PipeMessageType.getPipeStatus(ReadWriteIOUtils.readByte(buffer));
+    return new PipeMessage(type, message);
+  }
+
   @Override
   public String toString() {
     return "PipeMessage{" + "message='" + message + '\'' + ", type=" + type + '}';
   }
 
   public enum PipeMessageType {
-    NORMAL(1),
-    WARN(2),
-    ERROR(3);
+    NORMAL((byte) 1),
+    WARN((byte) 2),
+    ERROR((byte) 3);
 
-    private int value;
+    private byte type;
+
+    PipeMessageType(byte type) {
+      this.type = type;
+    }
 
-    PipeMessageType(int value) {
-      this.value = value;
+    public byte getType() {
+      return type;
     }
 
-    public int getValue() {
-      return value;
+    public static PipeMessageType getPipeStatus(byte type) {
+      switch (type) {
+        case 1:
+          return PipeMessageType.NORMAL;
+        case 2:
+          return PipeMessageType.WARN;
+        case 3:
+          return PipeMessageType.ERROR;
+        default:
+          throw new IllegalArgumentException("Invalid input: " + type);
+      }
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/transport/SyncIdentityInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/transport/SyncIdentityInfo.java
new file mode 100644
index 0000000000..139a715fa9
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/transport/SyncIdentityInfo.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.sync.transport;
+
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+
+public class SyncIdentityInfo {
+  public String pipeName;
+  public long createTime;
+  public String version;
+  public String remoteAddress;
+  public String database;
+
+  public SyncIdentityInfo(TSyncIdentityInfo identityInfo, String remoteAddress) {
+    this.pipeName = identityInfo.getPipeName();
+    this.createTime = identityInfo.getCreateTime();
+    this.version = identityInfo.getVersion();
+    this.database = identityInfo.getDatabase();
+    this.remoteAddress = remoteAddress;
+  }
+
+  public String getPipeName() {
+    return pipeName;
+  }
+
+  public long getCreateTime() {
+    return createTime;
+  }
+
+  public String getVersion() {
+    return version;
+  }
+
+  public String getRemoteAddress() {
+    return remoteAddress;
+  }
+
+  public String getDatabase() {
+    return database;
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/utils/SyncPathUtil.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/utils/SyncPathUtil.java
index 086f5bda32..157110d498 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/utils/SyncPathUtil.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/utils/SyncPathUtil.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.commons.sync.utils;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.commons.sync.transport.SyncIdentityInfo;
 
 import java.io.File;
 import java.io.IOException;
@@ -123,14 +123,9 @@ public class SyncPathUtil {
         + SyncConstant.FILE_DATA_DIR_NAME;
   }
 
-  public static String getFileDataDirPath(TSyncIdentityInfo identityInfo) {
+  public static String getFileDataDirPath(SyncIdentityInfo identityInfo) {
     return SyncPathUtil.getReceiverFileDataDir(
-        identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
-  }
-
-  public static String getPipeLogDirPath(TSyncIdentityInfo identityInfo) {
-    return SyncPathUtil.getReceiverPipeLogDir(
-        identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
+        identityInfo.getPipeName(), identityInfo.getRemoteAddress(), identityInfo.getCreateTime());
   }
 
   /** common */
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 2d14fd9394..642374eefa 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -81,6 +81,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -1620,6 +1621,22 @@ public class ConfigNodeClient
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
+  @Override
+  public TSStatus recordPipeMessage(TRecordPipeMessageReq req) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TSStatus status = client.recordPipeMessage(req);
+        if (!updateConfigNodeLeader(status)) {
+          return status;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      reconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index dc11dc1460..01f37004bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -544,7 +544,7 @@ public class IoTDBConfig {
   private int externalSortThreshold = 1000;
 
   /** White list for sync */
-  private String ipWhiteList = "0.0.0.0/0";
+  private String ipWhiteList = "127.0.0.1/32";
 
   /** The maximum number of retries when the sender fails to synchronize files to the receiver. */
   private int maxNumberOfSyncFileRetry = 5;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
index 81d70a93a0..2e5b646634 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
@@ -43,7 +43,7 @@ public abstract class IClientSession {
 
   private long logInTime;
 
-  abstract String getClientAddress();
+  public abstract String getClientAddress();
 
   abstract int getClientPort();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index e586de0d18..055bcda62b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -1693,7 +1693,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus handshake(TSyncIdentityInfo info) throws TException {
     // TODO(sync): Check permissions here
-    return SyncService.getInstance().handshake(info, PARTITION_FETCHER, SCHEMA_FETCHER);
+    return SyncService.getInstance()
+        .handshake(
+            info,
+            SESSION_MANAGER.getCurrSession().getClientAddress(),
+            PARTITION_FETCHER,
+            SCHEMA_FETCHER);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index cd384011ff..b31bfc40cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -1375,7 +1375,8 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public TSStatus handshake(TSyncIdentityInfo info) throws TException {
-    return SyncService.getInstance().handshake(info, null, null);
+    return SyncService.getInstance()
+        .handshake(info, SESSION_MANAGER.getCurrSession().getClientAddress(), null, null);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index 2b6cb050d9..ccc95932b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.sync.pipe.PipeStatus;
 import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.commons.sync.transport.SyncIdentityInfo;
 import org.apache.iotdb.commons.sync.utils.SyncConstant;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
@@ -110,9 +111,10 @@ public class SyncService implements IService {
 
   public TSStatus handshake(
       TSyncIdentityInfo identityInfo,
+      String remoteAddress,
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher) {
-    return receiverManager.handshake(identityInfo, partitionFetcher, schemaFetcher);
+    return receiverManager.handshake(identityInfo, remoteAddress, partitionFetcher, schemaFetcher);
   }
 
   public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff)
@@ -345,7 +347,7 @@ public class SyncService implements IService {
     }
   }
 
-  public synchronized void recordMessage(String pipeName, PipeMessage message) {
+  public void recordMessage(String pipeName, PipeMessage message) {
     if (!pipes.containsKey(pipeName)) {
       logger.warn(String.format("No running PIPE for message %s.", message));
       return;
@@ -353,17 +355,13 @@ public class SyncService implements IService {
     TSStatus status = null;
     switch (message.getType()) {
       case ERROR:
-        logger.error("{}", message);
+        logger.error(
+            "Error occurred when executing PIPE [{}] because {}.", pipeName, message.getMessage());
         status = syncInfoFetcher.recordMsg(pipeName, message);
-        try {
-          stopPipe(pipeName);
-        } catch (PipeException e) {
-          logger.error(
-              String.format("Stop PIPE %s when meeting error in sender service.", pipeName), e);
-        }
         break;
       case WARN:
-        logger.warn("{}", message);
+        logger.error(
+            "Warn occurred when executing PIPE [{}] because {}.", pipeName, message.getMessage());
         status = syncInfoFetcher.recordMsg(pipeName, message);
         break;
       default:
@@ -396,14 +394,14 @@ public class SyncService implements IService {
   public List<TShowPipeInfo> showPipeForReceiver(String pipeName) {
     boolean showAll = StringUtils.isEmpty(pipeName);
     List<TShowPipeInfo> list = new ArrayList<>();
-    for (TSyncIdentityInfo identityInfo : receiverManager.getAllTSyncIdentityInfos()) {
+    for (SyncIdentityInfo identityInfo : receiverManager.getAllTSyncIdentityInfos()) {
       if (showAll || pipeName.equals(identityInfo.getPipeName())) {
         TShowPipeInfo tPipeInfo =
             new TShowPipeInfo(
                 identityInfo.getCreateTime(),
                 identityInfo.getPipeName(),
                 SyncConstant.ROLE_RECEIVER,
-                identityInfo.getAddress(),
+                identityInfo.getRemoteAddress(),
                 PipeStatus.RUNNING.name(),
                 String.format("Database='%s'", identityInfo.getDatabase()),
                 // TODO: implement receiver message
@@ -483,7 +481,7 @@ public class SyncService implements IService {
     try {
       recover();
     } catch (Exception e) {
-      logger.error("Recover from disk error.", e);
+      logger.error("Recovery error.", e);
       throw new StartupException(e);
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
index fbcee83b13..e596c89ede 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -28,12 +28,14 @@ import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
 import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -126,7 +128,15 @@ public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
 
   @Override
   public TSStatus recordMsg(String pipeName, PipeMessage message) {
-    return null;
+    try (ConfigNodeClient configNodeClient =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+      TRecordPipeMessageReq req =
+          new TRecordPipeMessageReq(pipeName, message.serializeToByteBuffer());
+      return configNodeClient.recordPipeMessage(req);
+    } catch (Exception e) {
+      LOGGER.error("RecordMsg error because {}", e.getMessage(), e);
+      return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, e.getMessage());
+    }
   }
 
   // endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java
index 43ef452b70..b3d63a700c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.sync.pipedata;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.sync.receiver.load.DeletionLoader;
-import org.apache.iotdb.db.sync.receiver.load.ILoader;
+import org.apache.iotdb.db.sync.pipedata.load.DeletionLoader;
+import org.apache.iotdb.db.sync.pipedata.load.ILoader;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java
index fa9fcbbe3f..41a9870bcb 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.sync.pipedata;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.db.sync.receiver.load.ILoader;
+import org.apache.iotdb.db.sync.pipedata.load.ILoader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
index 6eb4e4dca6..c8b910ff85 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
@@ -23,8 +23,8 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.sync.utils.SyncConstant;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.sync.receiver.load.ILoader;
-import org.apache.iotdb.db.sync.receiver.load.TsFileLoader;
+import org.apache.iotdb.db.sync.pipedata.load.ILoader;
+import org.apache.iotdb.db.sync.pipedata.load.TsFileLoader;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/DeletionLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/DeletionLoader.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/sync/receiver/load/DeletionLoader.java
rename to server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/DeletionLoader.java
index ed79fdcf1a..ab840886fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/DeletionLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/DeletionLoader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.sync.receiver.load;
+package org.apache.iotdb.db.sync.pipedata.load;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoader.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoader.java
rename to server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
index 36ba0da5ea..754893d562 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.sync.receiver.load;
+package org.apache.iotdb.db.sync.pipedata.load;
 
 import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
 import org.apache.iotdb.db.conf.IoTDBConfig;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/TsFileLoader.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
rename to server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/TsFileLoader.java
index e86c10ec46..6d80266b5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/TsFileLoader.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.sync.receiver.load;
+package org.apache.iotdb.db.sync.pipedata.load;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
index 19426575a6..26852c4429 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
@@ -148,6 +148,8 @@ public class TsFilePipe implements Pipe {
     if (pipeInfo.getStatus() == PipeStatus.RUNNING) {
       return;
     }
+    // check connection
+    senderManager.checkConnection();
 
     // init sync manager
     List<DataRegion> dataRegions = StorageEngineV2.getInstance().getAllDataRegions();
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ISyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ISyncClient.java
index b92192cf27..a1008df970 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ISyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ISyncClient.java
@@ -19,17 +19,18 @@
  */
 package org.apache.iotdb.db.sync.transport.client;
 
-import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.iotdb.commons.exception.sync.SyncConnectionException;
+import org.apache.iotdb.commons.exception.sync.SyncHandshakeException;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
 
 public interface ISyncClient {
   /**
    * Create connection and handshake before sending messages
    *
-   * @return true if success; false if failed to check IoTDB version.
    * @throws SyncConnectionException cannot create connection to receiver
+   * @throws SyncHandshakeException cannot handshake with receiver
    */
-  boolean handshake() throws SyncConnectionException;
+  void handshake() throws SyncConnectionException;
 
   /**
    * Send {@link PipeData} to receiver and load.
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSyncClient.java
index 6250773b1a..358a83901c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSyncClient.java
@@ -20,10 +20,11 @@
 package org.apache.iotdb.db.sync.transport.client;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.sync.SyncConnectionException;
+import org.apache.iotdb.commons.exception.sync.SyncHandshakeException;
 import org.apache.iotdb.commons.sync.utils.SyncConstant;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.SyncConnectionException;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
 import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
@@ -67,8 +68,6 @@ public class IoTDBSyncClient implements ISyncClient {
   private final String ipAddress;
   /* remote port */
   private final int port;
-  /* local IP address*/
-  private final String localIP;
   /* database name that client belongs to*/
   private final String databaseName;
 
@@ -80,16 +79,13 @@ public class IoTDBSyncClient implements ISyncClient {
    * @param pipe sync task
    * @param remoteAddress remote ip address
    * @param port remote port
-   * @param localAddress local ip address
    * @param databaseName database name that client belongs to
    */
-  public IoTDBSyncClient(
-      Pipe pipe, String remoteAddress, int port, String localAddress, String databaseName) {
+  public IoTDBSyncClient(Pipe pipe, String remoteAddress, int port, String databaseName) {
     RpcTransportFactory.setThriftMaxFrameSize(config.getThriftMaxFrameSize());
     this.pipe = pipe;
     this.ipAddress = remoteAddress;
     this.port = port;
-    this.localIP = localAddress;
     this.databaseName = databaseName;
   }
 
@@ -99,20 +95,20 @@ public class IoTDBSyncClient implements ISyncClient {
    * @param pipe sync task
    * @param remoteAddress remote ip address
    * @param port remote port
-   * @param localAddress local ip address
    */
-  public IoTDBSyncClient(Pipe pipe, String remoteAddress, int port, String localAddress) {
-    this(pipe, remoteAddress, port, localAddress, "");
+  public IoTDBSyncClient(Pipe pipe, String remoteAddress, int port) {
+    this(pipe, remoteAddress, port, "");
   }
 
   /**
    * Create thrift connection to receiver. Check IoTDB version to make sure compatibility
    *
-   * @return true if success; false if failed to check IoTDB version.
+   * @return true if S; false if failed to check IoTDB version.
    * @throws SyncConnectionException cannot create connection to receiver
+   * @throws SyncHandshakeException cannot handshake with receiver
    */
   @Override
-  public synchronized boolean handshake() throws SyncConnectionException {
+  public synchronized void handshake() throws SyncConnectionException {
     if (transport != null && transport.isOpen()) {
       transport.close();
     }
@@ -141,21 +137,17 @@ public class IoTDBSyncClient implements ISyncClient {
 
       TSyncIdentityInfo identityInfo =
           new TSyncIdentityInfo(
-              localIP,
-              pipe.getName(),
-              pipe.getCreateTime(),
-              config.getIoTDBMajorVersion(),
-              databaseName);
+              pipe.getName(), pipe.getCreateTime(), config.getIoTDBMajorVersion(), databaseName);
       TSStatus status = serviceClient.handshake(identityInfo);
       if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        logger.error("The receiver rejected the synchronization task because {}", status.message);
-        return false;
+        throw new SyncHandshakeException(
+            String.format(
+                "the receiver rejected the synchronization task because %s", status.message));
       }
     } catch (TException e) {
       throw new SyncConnectionException(
-          String.format("Cannot connect to the receiver because %s.", e.getMessage()));
+          String.format("cannot connect to the receiver because %s", e.getMessage()));
     }
-    return true;
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
index b91405bd30..0615f19a3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SenderManager.java
@@ -23,11 +23,12 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.exception.sync.SyncConnectionException;
+import org.apache.iotdb.commons.exception.sync.SyncHandshakeException;
 import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.commons.sync.utils.SyncConstant;
 import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.exception.SyncConnectionException;
 import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
@@ -72,6 +73,8 @@ public class SenderManager {
 
   private boolean isRunning;
 
+  private boolean isError = false;
+
   public SenderManager(Pipe pipe, PipeSink pipeSink) {
     this.pipe = pipe;
     this.pipeSink = pipeSink;
@@ -86,6 +89,20 @@ public class SenderManager {
     this.isRunning = false;
   }
 
+  public void checkConnection() {
+    ISyncClient client = SyncClientFactory.createHeartbeatClient(pipe, pipeSink);
+    try {
+      client.handshake();
+    } catch (SyncConnectionException syncConnectionException) {
+      logger.warn(
+          "Cannot connect to the receiver {} when starting PIPE check because {}, PIPE will keep RUNNING and try to reconnect",
+          pipeSink,
+          syncConnectionException.getMessage());
+    } finally {
+      client.close();
+    }
+  }
+
   public void start() {
     blockingQueue.clear();
     lastReportTime = System.currentTimeMillis();
@@ -169,12 +186,22 @@ public class SenderManager {
             object.notify();
           }
         }
+        isError = false;
       } catch (SyncConnectionException e) {
+        if (e instanceof SyncHandshakeException && !isError) {
+          SyncService.getInstance()
+              .recordMessage(
+                  pipe.getName(),
+                  new PipeMessage(
+                      PipeMessage.PipeMessageType.ERROR,
+                      String.format("Can not handshake with %s", pipeSink)));
+          isError = true;
+        }
         blockingQueue.offer(object);
         long reportInterval = System.currentTimeMillis() - lastReportTime;
         if (reportInterval > SyncConstant.LOST_CONNECT_REPORT_MILLISECONDS) {
           logger.warn(
-              "Connection error because {}, lost contact with the receiver {} for {} milliseconds.",
+              "Connection error because {}. Lost contact with the receiver {} for {} milliseconds.",
               e.getMessage(),
               pipeSink,
               System.currentTimeMillis() - lostConnectionTime);
@@ -184,7 +211,7 @@ public class SenderManager {
         client.close();
       }
     } catch (InterruptedException e) {
-      e.printStackTrace();
+      logger.info("Interrupted by PIPE operation, exit heartbeat.");
     }
   }
 
@@ -194,14 +221,7 @@ public class SenderManager {
       synchronized (lock) {
         while (!Thread.currentThread().isInterrupted()) {
           try {
-            if (!syncClient.handshake()) {
-              SyncService.getInstance()
-                  .recordMessage(
-                      pipe.getName(),
-                      new PipeMessage(
-                          PipeMessage.PipeMessageType.ERROR,
-                          String.format("Can not handshake with %s", pipeSink)));
-            }
+            syncClient.handshake();
             while (!Thread.currentThread().isInterrupted()) {
               PipeData pipeData = pipe.take(dataRegionId);
               if (!syncClient.send(pipeData)) {
@@ -219,9 +239,10 @@ public class SenderManager {
               pipe.commit(dataRegionId);
             }
           } catch (SyncConnectionException e) {
-            // If failed to connect to receiver, it will hang up until scheduled heartbeat task
+            // If failed to connect to receiver or failed to handshake with receiver, it will hang
+            // up until scheduled heartbeat task
             // successfully reconnect to receiver.
-            logger.error("Connect to receiver {} error, because {}.", pipeSink, e.getMessage(), e);
+            logger.error("Connect to receiver {} error, because {}.", pipeSink, e.getMessage());
             lostConnectionTime = Math.min(lostConnectionTime, System.currentTimeMillis());
             blockingQueue.offer(lock);
             lock.wait();
@@ -229,7 +250,7 @@ public class SenderManager {
         }
       }
     } catch (InterruptedException e) {
-      logger.info("Interrupted by pipe, exit transport.");
+      logger.info("Interrupted by PIPE operation, exit transport.");
     } finally {
       syncClient.close();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SyncClientFactory.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SyncClientFactory.java
index 12c1df9800..f8bd4fa0ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SyncClientFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/SyncClientFactory.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.sync.transport.client;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.sync.pipesink.IoTDBPipeSink;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
-import org.apache.iotdb.commons.sync.utils.SyncConstant;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
@@ -29,11 +28,6 @@ import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-
 public class SyncClientFactory {
 
   private static final Logger logger = LoggerFactory.getLogger(SyncClientFactory.class);
@@ -46,11 +40,7 @@ public class SyncClientFactory {
       case IoTDB:
         IoTDBPipeSink ioTDBPipeSink = (IoTDBPipeSink) pipeSink;
         return new IoTDBSyncClient(
-            pipe,
-            ioTDBPipeSink.getIp(),
-            ioTDBPipeSink.getPort(),
-            getLocalIP(ioTDBPipeSink),
-            dataRegion.getStorageGroupName());
+            pipe, ioTDBPipeSink.getIp(), ioTDBPipeSink.getPort(), dataRegion.getStorageGroupName());
       case ExternalPipe:
       default:
         throw new UnsupportedOperationException();
@@ -61,30 +51,10 @@ public class SyncClientFactory {
     switch (pipeSink.getType()) {
       case IoTDB:
         IoTDBPipeSink ioTDBPipeSink = (IoTDBPipeSink) pipeSink;
-        return new IoTDBSyncClient(
-            pipe, ioTDBPipeSink.getIp(), ioTDBPipeSink.getPort(), getLocalIP(ioTDBPipeSink));
+        return new IoTDBSyncClient(pipe, ioTDBPipeSink.getIp(), ioTDBPipeSink.getPort());
       case ExternalPipe:
       default:
         throw new UnsupportedOperationException();
     }
   }
-
-  private static String getLocalIP(IoTDBPipeSink pipeSink) {
-    String localIP;
-    try {
-      InetAddress inetAddress = InetAddress.getLocalHost();
-      if (inetAddress.isLoopbackAddress()) {
-        try (final DatagramSocket socket = new DatagramSocket()) {
-          socket.connect(InetAddress.getByName(pipeSink.getIp()), pipeSink.getPort());
-          localIP = socket.getLocalAddress().getHostAddress();
-        }
-      } else {
-        localIP = inetAddress.getHostAddress();
-      }
-    } catch (UnknownHostException | SocketException e) {
-      logger.error("Get local host error when create transport handler.", e);
-      localIP = SyncConstant.UNKNOWN_IP;
-    }
-    return localIP;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
index 7d8ee67d90..d0c348fda1 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.sync.transport.SyncIdentityInfo;
 import org.apache.iotdb.commons.sync.utils.SyncConstant;
 import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -40,6 +41,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
 import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,7 +69,7 @@ public class ReceiverManager {
   // When the client abnormally exits, we can still know who to disconnect
   private final ThreadLocal<Long> currentConnectionId;
   // Record the remote message for every rpc connection
-  private final Map<Long, TSyncIdentityInfo> connectionIdToIdentityInfoMap;
+  private final Map<Long, SyncIdentityInfo> connectionIdToIdentityInfoMap;
   // Record the remote message for every rpc connection
   private final Map<Long, Map<String, Long>> connectionIdToStartIndexRecord;
   private final Map<String, String> registeredDatabase;
@@ -149,22 +151,26 @@ public class ReceiverManager {
    *     TSStatusCode#SUCCESS_STATUS} if success to connect.
    */
   public TSStatus handshake(
-      TSyncIdentityInfo identityInfo,
+      TSyncIdentityInfo tIdentityInfo,
+      String remoteAddress,
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher) {
-    logger.info("Invoke handshake method from client ip = {}", identityInfo.address);
+    SyncIdentityInfo identityInfo = new SyncIdentityInfo(tIdentityInfo, remoteAddress);
+    logger.info("Invoke handshake method from client ip = {}", identityInfo.getRemoteAddress());
     // check ip address
-    if (!verifyIPSegment(config.getIpWhiteList(), identityInfo.address)) {
+    if (!verifyIPSegment(config.getIpWhiteList(), identityInfo.getRemoteAddress())) {
       return RpcUtils.getStatus(
           TSStatusCode.PIPESERVER_ERROR,
-          "Sender IP is not in the white list of receiver IP and synchronization tasks are not allowed.");
+          String.format(
+              "permission is not allowed: the sender IP <%s>, the white list of receiver <%s>",
+              identityInfo.getRemoteAddress(), config.getIpWhiteList()));
     }
     // Version check
     if (!config.getIoTDBMajorVersion(identityInfo.version).equals(config.getIoTDBMajorVersion())) {
       return RpcUtils.getStatus(
           TSStatusCode.PIPESERVER_ERROR,
           String.format(
-              "Version mismatch: the sender <%s>, the receiver <%s>",
+              "version mismatch: the sender <%s>, the receiver <%s>",
               identityInfo.version, config.getIoTDBVersion()));
     }
 
@@ -172,10 +178,12 @@ public class ReceiverManager {
       new File(SyncPathUtil.getFileDataDirPath(identityInfo)).mkdirs();
     }
     createConnection(identityInfo);
-    if (!registerDatabase(identityInfo.getDatabase(), partitionFetcher, schemaFetcher)) {
-      return RpcUtils.getStatus(
-          TSStatusCode.PIPESERVER_ERROR,
-          String.format("Auto register database %s error.", identityInfo.getDatabase()));
+    if (!StringUtils.isEmpty(identityInfo.getDatabase())) {
+      if (!registerDatabase(identityInfo.getDatabase(), partitionFetcher, schemaFetcher)) {
+        return RpcUtils.getStatus(
+            TSStatusCode.PIPESERVER_ERROR,
+            String.format("Auto register database %s error.", identityInfo.getDatabase()));
+      }
     }
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
   }
@@ -226,15 +234,16 @@ public class ReceiverManager {
    * @return {@link TSStatusCode#PIPESERVER_ERROR} if fail to receive or load; {@link
    *     TSStatusCode#SUCCESS_STATUS} if load successfully.
    * @throws TException The connection between the sender and the receiver has not been established
-   *     by {@link ReceiverManager#handshake(TSyncIdentityInfo, IPartitionFetcher, ISchemaFetcher)}
+   *     by {@link ReceiverManager#handshake}
    */
   public TSStatus transportPipeData(ByteBuffer buff) throws TException {
     // step1. check connection
-    TSyncIdentityInfo identityInfo = getCurrentTSyncIdentityInfo();
+    SyncIdentityInfo identityInfo = getCurrentSyncIdentityInfo();
     if (identityInfo == null) {
       throw new TException("Thrift connection is not alive.");
     }
-    logger.debug("Invoke transportPipeData method from client ip = {}", identityInfo.address);
+    logger.debug(
+        "Invoke transportPipeData method from client ip = {}", identityInfo.getRemoteAddress());
     String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
 
     // step2. deserialize PipeData
@@ -281,16 +290,17 @@ public class ReceiverManager {
    *     TSStatusCode#SYNC_FILE_REDIRECTION_ERROR} if startIndex needs to rollback because
    *     mismatched; {@link TSStatusCode#SYNC_FILE_ERROR} if fail to receive file.
    * @throws TException The connection between the sender and the receiver has not been established
-   *     by {@link ReceiverManager#handshake(TSyncIdentityInfo, IPartitionFetcher, ISchemaFetcher)}
+   *     by {@link ReceiverManager#handshake}
    */
   public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff)
       throws TException {
     // step1. check connection
-    TSyncIdentityInfo identityInfo = getCurrentTSyncIdentityInfo();
+    SyncIdentityInfo identityInfo = getCurrentSyncIdentityInfo();
     if (identityInfo == null) {
       throw new TException("Thrift connection is not alive.");
     }
-    logger.debug("Invoke transportData method from client ip = {}", identityInfo.address);
+    logger.debug(
+        "Invoke transportData method from client ip = {}", identityInfo.getRemoteAddress());
 
     String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
     String fileName = metaInfo.fileName;
@@ -371,11 +381,11 @@ public class ReceiverManager {
   }
 
   /**
-   * Get current TSyncIdentityInfo
+   * Get current SyncIdentityInfo
    *
    * @return null if connection has been exited
    */
-  private TSyncIdentityInfo getCurrentTSyncIdentityInfo() {
+  private SyncIdentityInfo getCurrentSyncIdentityInfo() {
     Long id = currentConnectionId.get();
     if (id != null) {
       return connectionIdToIdentityInfoMap.get(id);
@@ -385,7 +395,7 @@ public class ReceiverManager {
   }
 
   /**
-   * Get current TSyncIdentityInfo
+   * Get current FileStartIndex
    *
    * @return startIndex of file: -1 if file doesn't exist
    */
@@ -400,7 +410,7 @@ public class ReceiverManager {
     return -1;
   }
 
-  private void createConnection(TSyncIdentityInfo identityInfo) {
+  private void createConnection(SyncIdentityInfo identityInfo) {
     long connectionId = connectionIdGenerator.incrementAndGet();
     currentConnectionId.set(connectionId);
     connectionIdToIdentityInfoMap.put(connectionId, identityInfo);
@@ -447,12 +457,12 @@ public class ReceiverManager {
     if (checkConnection()) {
       long id = currentConnectionId.get();
       connectionIdToIdentityInfoMap.remove(id);
-      connectionIdToIdentityInfoMap.remove(id);
+      connectionIdToStartIndexRecord.remove(id);
       currentConnectionId.remove();
     }
   }
 
-  public List<TSyncIdentityInfo> getAllTSyncIdentityInfos() {
+  public List<SyncIdentityInfo> getAllTSyncIdentityInfos() {
     return new ArrayList<>(connectionIdToIdentityInfoMap.values());
   }
 
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 8d5698fa09..b0402f2078 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -504,6 +504,11 @@ struct TGetPathsSetTemplatesResp {
 }
 
 // SYNC
+struct TRecordPipeMessageReq{
+  1: required string pipeName
+  2: required binary message
+}
+
 struct TShowPipeInfo {
   1: required i64 createTime
   2: required string pipeName
@@ -1041,6 +1046,9 @@ service IConfigNodeRPCService {
   /* Get all pipe information. It is used for DataNode registration and restart*/
   TGetAllPipeInfoResp getAllPipeInfo();
 
+  /* Get all pipe information. It is used for DataNode registration and restart*/
+  common.TSStatus recordPipeMessage(TRecordPipeMessageReq req);
+
   // ======================================================
   // TestTools
   // ======================================================
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 130e81aa86..25f391fd24 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -418,14 +418,12 @@ struct TSDropSchemaTemplateReq {
 
 // The sender and receiver need to check some info to confirm validity
 struct TSyncIdentityInfo{
-  // Check whether the ip of sender is in the white list of receiver.
-  1:required string address
   // Sender needs to tell receiver its identity.
-  2:required string pipeName
-  3:required i64 createTime
+  1:required string pipeName
+  2:required i64 createTime
   // The version of sender and receiver need to be the same.
-  4:required string version
-  5:required string database
+  3:required string version
+  4:required string database
 }
 
 struct TSyncTransportMetaInfo{