You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/27 06:10:50 UTC

[iotdb] branch master updated: [IOTDB-2938]Some improvements and bug fix for New Sync (#5563)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6946f3031a [IOTDB-2938]Some improvements and bug fix for New Sync (#5563)
6946f3031a is described below

commit 6946f3031af64a59845aa17358b3b7ae42799af4
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Wed Apr 27 14:10:45 2022 +0800

    [IOTDB-2938]Some improvements and bug fix for New Sync (#5563)
    
    * 1.refactor sender IT and add delete and restart cases, 2.add close in TsFilePipe, 3.fix delete storage group
    
    * spotless
    
    * 5.change pipe data to pipe log for sender in SyncPathUtil
    
    * 5. change sender dir structure
    
    * 6. add MsgManager
    
    * add dir structure in SyncPathUtil and remove create Msg in TransportClient
    
    * fix restart MsgManager and add Connect timeout and Socket timeout for transport in TransportClient
    
    * spotless
    
    * delete System.out.println in SenderIT
---
 .../sync/IoTDBSyncReceiverCollectorIT.java         |  12 +-
 .../db/integration/sync/IoTDBSyncSenderIT.java     | 327 +++++++++++++++++++--
 .../db/integration/sync/TransportClientMock.java   |  12 +-
 .../db/integration/sync/TransportHandlerMock.java  |  61 ++++
 .../iotdb/db/localconfignode/LocalConfigNode.java  |  13 +-
 .../iotdb/db/metadata/LocalSchemaProcessor.java    |   6 +-
 .../db/qp/physical/sys/DeleteTimeSeriesPlan.java   |  12 +
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   2 +-
 .../apache/iotdb/db/sync/conf/SyncConstant.java    |  54 ++--
 .../apache/iotdb/db/sync/conf/SyncPathUtil.java    |  82 ++++--
 .../iotdb/db/sync/pipedata/DeletionPipeData.java   |   2 +-
 .../sync/pipedata/queue/BufferedPipeDataQueue.java |  31 +-
 .../db/sync/pipedata/queue/PipeDataQueue.java      |   2 +
 .../db/sync/receiver/collector/Collector.java      |   4 +-
 .../db/sync/receiver/manager/ReceiverManager.java  |   4 +-
 .../db/sync/sender/manager/SchemaSyncManager.java  |  18 +-
 .../iotdb/db/sync/sender/pipe/IoTDBPipeSink.java   |  18 ++
 .../org/apache/iotdb/db/sync/sender/pipe/Pipe.java |  10 +
 .../iotdb/db/sync/sender/pipe/TsFilePipe.java      |  61 +++-
 .../db/sync/sender/recovery/SenderLogAnalyzer.java |  25 +-
 .../db/sync/sender/recovery/SenderLogger.java      |  12 +-
 .../db/sync/sender/recovery/TsFilePipeLogger.java  |   2 +-
 .../iotdb/db/sync/sender/service/MsgManager.java   | 114 +++++++
 .../db/sync/sender/service/SenderService.java      |  81 ++---
 .../db/sync/sender/service/TransportHandler.java   |  47 ++-
 .../db/sync/transport/client/TransportClient.java  |  35 ++-
 .../sync/pipedata/BufferedPipeDataQueueTest.java   |  26 +-
 .../receiver/recovery/ReceiverLogAnalyzerTest.java |   4 +-
 28 files changed, 829 insertions(+), 248 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
index 67df8a3012..11b99be8c7 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
@@ -131,7 +131,7 @@ public class IoTDBSyncReceiverCollectorIT {
     outputStream.writeLong(-1);
     outputStream.close();
     int serialNum = 0;
-    File pipeLog1 = new File(pipeLogDir1.getPath(), SyncConstant.getPipeLogName(serialNum));
+    File pipeLog1 = new File(pipeLogDir1.getPath(), SyncPathUtil.getPipeLogName(serialNum));
     DataOutputStream pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog1, false));
     List<PhysicalPlan> planList = new ArrayList<>();
     planList.add(new SetStorageGroupPlan(new PartialPath("root.vehicle")));
@@ -182,7 +182,7 @@ public class IoTDBSyncReceiverCollectorIT {
       pipeData.serialize(pipeLogOutput);
     }
     pipeLogOutput.close();
-    File pipeLog2 = new File(pipeLogDir1.getPath(), SyncConstant.getPipeLogName(serialNum));
+    File pipeLog2 = new File(pipeLogDir1.getPath(), SyncPathUtil.getPipeLogName(serialNum));
     pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog2, false));
     List<File> tsFiles = SyncTestUtil.getTsFilePaths(tmpDir);
     for (File f : tsFiles) {
@@ -303,7 +303,7 @@ public class IoTDBSyncReceiverCollectorIT {
     outputStream.writeLong(-1);
     outputStream.close();
     int serialNum1 = 0;
-    File pipeLog1 = new File(pipeLogDir1.getPath(), SyncConstant.getPipeLogName(serialNum1));
+    File pipeLog1 = new File(pipeLogDir1.getPath(), SyncPathUtil.getPipeLogName(serialNum1));
     DataOutputStream pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog1, false));
     List<PhysicalPlan> planList = new ArrayList<>();
     planList.add(new SetStorageGroupPlan(new PartialPath("root.vehicle")));
@@ -328,7 +328,7 @@ public class IoTDBSyncReceiverCollectorIT {
       pipeData.serialize(pipeLogOutput);
     }
     pipeLogOutput.close();
-    File pipeLog2 = new File(pipeLogDir1.getPath(), SyncConstant.getPipeLogName(serialNum1));
+    File pipeLog2 = new File(pipeLogDir1.getPath(), SyncPathUtil.getPipeLogName(serialNum1));
     pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog2, false));
     List<File> tsFiles =
         SyncTestUtil.getTsFilePaths(new File(tmpDir, "sequence" + File.separator + "root.vehicle"));
@@ -355,7 +355,7 @@ public class IoTDBSyncReceiverCollectorIT {
             new FileOutputStream(new File(pipeLogDir2, SyncConstant.COMMIT_LOG_NAME), true));
     outputStream.writeLong(-1);
     outputStream.close();
-    pipeLog1 = new File(pipeLogDir2.getPath(), SyncConstant.getPipeLogName(serialNum2));
+    pipeLog1 = new File(pipeLogDir2.getPath(), SyncPathUtil.getPipeLogName(serialNum2));
     pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog1, false));
     pipeData =
         new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("root.sg1")), serialNum2++);
@@ -389,7 +389,7 @@ public class IoTDBSyncReceiverCollectorIT {
             serialNum2++);
     pipeData.serialize(pipeLogOutput);
     pipeLogOutput.close();
-    pipeLog2 = new File(pipeLogDir2.getPath(), SyncConstant.getPipeLogName(serialNum2));
+    pipeLog2 = new File(pipeLogDir2.getPath(), SyncPathUtil.getPipeLogName(serialNum2));
     pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog2, false));
     tsFiles =
         SyncTestUtil.getTsFilePaths(new File(tmpDir, "sequence" + File.separator + "root.sg1"));
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
index c9fa74c7f7..cacf6c6f57 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
@@ -19,11 +19,15 @@
 package org.apache.iotdb.db.integration.sync;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.sys.ShowPipeSinkTypePlan;
+import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
 import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
 import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
+import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
 import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.db.sync.sender.service.SenderService;
 import org.apache.iotdb.db.sync.sender.service.TransportHandler;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.itbase.category.LocalStandaloneTest;
@@ -38,7 +42,12 @@ import org.junit.experimental.categories.Category;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 @Category({LocalStandaloneTest.class})
 public class IoTDBSyncSenderIT {
@@ -49,8 +58,16 @@ public class IoTDBSyncSenderIT {
   private static final String pipeSinkName = "test_pipesink";
   private static final String pipeName = "test_pipe";
 
-  private TsFilePipe pipe;
-  private TransportClientMock mock;
+  private TransportHandlerMock handler;
+  private TransportClientMock transportClient;
+
+  private final Map<String, List<PipeData>> resultMap = new HashMap<>();
+  private static final TsFilePipeData simpleTsFilePipeData =
+      new TsFilePipeData("path", "tsfile", 0L);
+  private static final SchemaPipeData simpleSchemaPipeData =
+      new SchemaPipeData(new ShowPipeSinkTypePlan(), 0L);
+  private static final DeletionPipeData simpleDeletionPipeData =
+      new DeletionPipeData(new Deletion(new PartialPath(), 0L, 0L), 0L);
 
   @Before
   public void setUp() throws Exception {
@@ -65,6 +82,12 @@ public class IoTDBSyncSenderIT {
     IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
     IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
     Class.forName(Config.JDBC_DRIVER_NAME);
+
+    IoTDBPipeSink pipeSink = new IoTDBPipeSink(pipeSinkName);
+    TsFilePipe pipe = new TsFilePipe(0L, pipeName, pipeSink, 0L, true);
+    transportClient = new TransportClientMock(pipe, pipeSink.getIp(), pipeSink.getPort());
+    handler = new TransportHandlerMock(pipe, pipeSink, transportClient);
+    TransportHandler.setDebugTransportHandler(handler);
   }
 
   @After
@@ -80,7 +103,7 @@ public class IoTDBSyncSenderIT {
     EnvironmentUtils.cleanEnv();
   }
 
-  private void prepareSchema() throws Exception {
+  private void prepareSchema() throws Exception { // 8 schema plans
     try (Connection connection =
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
@@ -93,6 +116,18 @@ public class IoTDBSyncSenderIT {
       statement.execute("create timeseries root.sg2.d1.s0 with datatype=int32, encoding=PLAIN");
       statement.execute("create timeseries root.sg2.d2.s1 with datatype=boolean, encoding=PLAIN");
     }
+
+    List<PipeData> resultList = new ArrayList<>();
+    for (int i = 0; i < 4; i++) {
+      resultList.add(simpleSchemaPipeData);
+    }
+    resultMap.put("schemaWithDel3InHistory", resultList); // del3 in history
+
+    resultList = new ArrayList<>();
+    for (int i = 0; i < 8; i++) {
+      resultList.add(simpleSchemaPipeData);
+    }
+    resultMap.put("schema", resultList); // del3 do not in history
   }
 
   private void prepareIns1() throws Exception { // add one seq tsfile in sg1
@@ -107,6 +142,8 @@ public class IoTDBSyncSenderIT {
       statement.execute("insert into root.sg1.d2(timestamp, s4) values(1, 1)");
       statement.execute("flush");
     }
+
+    resultMap.put("ins1", Collections.singletonList(simpleTsFilePipeData));
   }
 
   private void prepareIns2() throws Exception { // add one seq tsfile in sg1
@@ -120,6 +157,8 @@ public class IoTDBSyncSenderIT {
       statement.execute("insert into root.sg1.d2(timestamp, s4) values(200, 100)");
       statement.execute("flush");
     }
+
+    resultMap.put("ins2", Collections.singletonList(simpleTsFilePipeData));
   }
 
   private void prepareIns3()
@@ -137,27 +176,72 @@ public class IoTDBSyncSenderIT {
           "insert into root.sg1.d1(timestamp, s1, s2, s3) values(200, 100, 16.65, 'h')");
       statement.execute("flush");
     }
+
+    resultMap.put(
+        "ins3",
+        Arrays.asList(
+            simpleTsFilePipeData, simpleTsFilePipeData, simpleTsFilePipeData)); // del3 in history
+    resultMap.put(
+        "ins3WithDel3InHistory",
+        Arrays.asList(simpleTsFilePipeData, simpleTsFilePipeData)); // del3 do not in history
+  }
+
+  private void prepareDel1() throws Exception { // after ins1, add 2 deletions
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("delete from root.sg1.d1.s1 where time == 3");
+      statement.execute("delete from root.sg1.d1.s2 where time >= 1 and time <= 2");
+    }
+
+    resultMap.put("del1", Arrays.asList(simpleDeletionPipeData, simpleDeletionPipeData));
+  }
+
+  private void prepareDel2() throws Exception { // after ins2, add 3 deletions
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("delete from root.sg1.d1.s3 where time <= 65");
+    }
+
+    resultMap.put(
+        "del2",
+        Arrays.asList(simpleDeletionPipeData, simpleDeletionPipeData, simpleDeletionPipeData));
+    resultMap.put("del2WithoutIns3", Arrays.asList(simpleDeletionPipeData, simpleDeletionPipeData));
+  }
+
+  private void prepareDel3() throws Exception { // after ins3, add 5 deletions, 2 schemas
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("delete from root.sg1.d1.* where time <= 2");
+      statement.execute("delete timeseries root.sg1.d2.*");
+      statement.execute("delete storage group root.sg2");
+    }
+
+    List<PipeData> resultList = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      resultList.add(simpleDeletionPipeData);
+    }
+    for (int i = 0; i < 2; i++) {
+      resultList.add(simpleSchemaPipeData);
+    }
+    resultMap.put("del3", resultList);
   }
 
   private void preparePipeAndSetMock() throws Exception {
     try (Connection connection =
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
-      statement.execute("start pipeserver");
       statement.execute("create pipesink " + pipeSinkName + " as iotdb");
       statement.execute("create pipe " + pipeName + " to " + pipeSinkName);
-      pipe = (TsFilePipe) SenderService.getInstance().getRunningPipe();
-      mock =
-          new TransportClientMock(SenderService.getInstance().getRunningPipe(), "127.0.0.1", 2333);
-      TransportHandler handler =
-          new TransportHandler(
-              mock, pipeName, SenderService.getInstance().getRunningPipe().getCreateTime());
-      SenderService.getInstance().setTransportHandler(handler);
-      Thread.sleep(1000L);
-      statement.execute("stop pipeserver");
     }
   }
 
+  private void restart() throws Exception {
+    EnvironmentUtils.restartDaemon();
+  }
+
   private void startPipe() throws Exception {
     try (Connection connection =
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
@@ -182,8 +266,8 @@ public class IoTDBSyncSenderIT {
     }
   }
 
-  private void checkResult(List<PipeData> list) { // check ins1, ins2, ins3
-    Assert.assertEquals(list.size(), 13);
+  private void checkInsOnlyResult(List<PipeData> list) { // check ins1, ins2, ins3
+    Assert.assertEquals(13, list.size());
     for (int i = 0; i < 8; i++) {
       Assert.assertTrue(list.get(i) instanceof SchemaPipeData);
     }
@@ -192,18 +276,33 @@ public class IoTDBSyncSenderIT {
     }
   }
 
+  private void checkResult(List<String> resultString, List<PipeData> list) {
+    int totalNumber = 0;
+    for (String string : resultString) {
+      totalNumber += resultMap.get(string).size();
+    }
+    Assert.assertEquals(totalNumber, list.size());
+    int cnt = 0;
+    for (String string : resultString) {
+      for (PipeData pipeData : resultMap.get(string)) {
+        Assert.assertEquals(pipeData.getType(), list.get(cnt++).getType());
+      }
+    }
+  }
+
   @Test
   public void testHistoryInsert() {
     try {
-      prepareSchema();
+      prepareSchema(); // history
       prepareIns1();
       prepareIns2();
       prepareIns3();
 
-      preparePipeAndSetMock();
+      preparePipeAndSetMock(); // realtime
       startPipe();
-      Thread.sleep(1000L);
-      checkResult(mock.getPipeDataList());
+
+      Thread.sleep(1000L); // check
+      checkInsOnlyResult(transportClient.getPipeDataList());
     } catch (Exception e) {
       e.printStackTrace();
       Assert.fail();
@@ -221,16 +320,17 @@ public class IoTDBSyncSenderIT {
   @Test
   public void testHistoryAndRealTimeInsert() {
     try {
-      prepareSchema();
+      prepareSchema(); // history
       prepareIns1();
       prepareIns2();
 
-      preparePipeAndSetMock();
+      preparePipeAndSetMock(); // realtime
       startPipe();
       Thread.sleep(1000L);
       prepareIns3();
-      Thread.sleep(1000L);
-      checkResult(mock.getPipeDataList());
+
+      Thread.sleep(1000L); // check
+      checkInsOnlyResult(transportClient.getPipeDataList());
     } catch (Exception e) {
       e.printStackTrace();
       Assert.fail();
@@ -248,17 +348,18 @@ public class IoTDBSyncSenderIT {
   @Test
   public void testStopAndStartInsert() {
     try {
-      prepareSchema();
+      prepareSchema(); // history
       prepareIns1();
 
-      preparePipeAndSetMock();
+      preparePipeAndSetMock(); // realtime
       startPipe();
       prepareIns2();
       stopPipe();
       prepareIns3();
       startPipe();
-      Thread.sleep(1000L);
-      checkResult(mock.getPipeDataList());
+
+      Thread.sleep(1000L); // check
+      checkInsOnlyResult(transportClient.getPipeDataList());
     } catch (Exception e) {
       e.printStackTrace();
       Assert.fail();
@@ -274,20 +375,182 @@ public class IoTDBSyncSenderIT {
   }
 
   @Test
-  public void testRealTimeSchemaAndStopInsert() {
+  public void testRealTimeAndStopInsert() {
     try {
-      preparePipeAndSetMock();
+      preparePipeAndSetMock(); // realtime
+      startPipe();
       prepareSchema();
+      prepareIns1();
+      stopPipe();
+      prepareIns2();
+      startPipe();
+      prepareIns3();
+      stopPipe();
+
+      Thread.sleep(1000L); // check
+      checkInsOnlyResult(transportClient.getPipeDataList());
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    } finally {
+      try {
+        dropPipe();
+        Thread.sleep(1000L);
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+    }
+  }
+
+  @Test
+  public void testHistoryDel() {
+    try {
+      prepareSchema(); // history
+      prepareIns1();
+      prepareIns2();
+      prepareIns3();
+      prepareDel1();
+      prepareDel2();
+      prepareDel3();
+
+      preparePipeAndSetMock(); // realtime
+      startPipe();
+
+      Thread.sleep(1000L); // check
+      checkResult(
+          Arrays.asList("schemaWithDel3InHistory", "ins1", "ins2", "ins3WithDel3InHistory"),
+          transportClient.getPipeDataList());
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    } finally {
+      try {
+        dropPipe();
+        Thread.sleep(1000L);
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+    }
+  }
+
+  @Test
+  public void testRealtimeDel() {
+    try {
+      prepareSchema(); // history
+      prepareIns1();
+
+      preparePipeAndSetMock(); // realtime
+      startPipe();
+      prepareIns2();
+      prepareDel1();
+      stopPipe();
+      prepareIns3();
       startPipe();
+      prepareDel2();
+      prepareDel3();
+      stopPipe();
+
+      Thread.sleep(1000L); // check
+      checkResult(
+          Arrays.asList("schema", "ins1", "ins2", "del1", "ins3", "del2", "del3"),
+          transportClient.getPipeDataList());
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    } finally {
+      try {
+        dropPipe();
+        Thread.sleep(1000L);
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+    }
+  }
+
+  @Test
+  public void testRestartWhileRunning() {
+    try {
+      prepareSchema(); // history
       prepareIns1();
+      prepareIns2();
+
+      preparePipeAndSetMock(); // realtime
+      startPipe();
+      restart();
+      prepareIns3();
+
+      Thread.sleep(1000L); // check
+      checkInsOnlyResult(transportClient.getPipeDataList());
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    } finally {
+      try {
+        dropPipe();
+        Thread.sleep(1000L);
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+    }
+  }
+
+  @Test
+  public void testRestartWhileStopping() {
+    try {
+      prepareSchema(); // history
+      prepareIns1();
+
+      preparePipeAndSetMock(); // realtime
+      startPipe();
+      prepareIns2();
       stopPipe();
+      restart();
+      prepareIns3();
+      startPipe();
+
+      Thread.sleep(1000L); // check
+      checkInsOnlyResult(transportClient.getPipeDataList());
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    } finally {
+      try {
+        dropPipe();
+        Thread.sleep(1000L);
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+    }
+  }
+
+  @Test
+  public void testRestartWithDel() {
+    try {
+      prepareSchema(); // history
+      prepareIns1();
+      prepareDel1();
+
+      preparePipeAndSetMock(); // realtime
+      startPipe();
       prepareIns2();
+      stopPipe();
+      prepareDel2();
+      restart();
       startPipe();
       prepareIns3();
       stopPipe();
+      prepareDel3();
+      startPipe();
 
-      Thread.sleep(1000L);
-      checkResult(mock.getPipeDataList());
+      Thread.sleep(1000L); // check
+      checkResult(
+          Arrays.asList("schema", "ins1", "ins2", "del2WithoutIns3", "ins3", "del3"),
+          transportClient.getPipeDataList());
     } catch (Exception e) {
       e.printStackTrace();
       Assert.fail();
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportClientMock.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportClientMock.java
index 702a72f6f9..1614e2e142 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportClientMock.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportClientMock.java
@@ -30,9 +30,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class TransportClientMock implements ITransportClient {
-  private final Pipe pipe;
-  private final String ipAddress;
-  private final int port;
+  private Pipe pipe;
+  private String ipAddress;
+  private int port;
 
   private List<PipeData> pipeDataList;
 
@@ -49,6 +49,12 @@ public class TransportClientMock implements ITransportClient {
     return new SyncResponse(ResponseType.INFO, "");
   }
 
+  public void resetInfo(Pipe pipe, String ipAddress, int port) {
+    this.pipe = pipe;
+    this.ipAddress = ipAddress;
+    this.port = port;
+  }
+
   @Override
   public void run() {
     try {
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportHandlerMock.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportHandlerMock.java
new file mode 100644
index 0000000000..be7ea71c85
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportHandlerMock.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.sync;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.db.sync.sender.service.TransportHandler;
+
+public class TransportHandlerMock extends TransportHandler {
+  private Pipe pipe;
+
+  public TransportHandlerMock(
+      Pipe pipe, IoTDBPipeSink pipeSink, TransportClientMock transportClientMock) {
+    super(pipe, pipeSink);
+    this.transportClient = transportClientMock;
+  }
+
+  @Override
+  protected void resetTransportClient(Pipe pipe) {
+    try {
+      super.close();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    super.resetTransportClient(pipe);
+    IoTDBPipeSink pipeSink = (IoTDBPipeSink) pipe.getPipeSink();
+    ((TransportClientMock) this.transportClient)
+        .resetInfo(pipe, pipeSink.getIp(), pipeSink.getPort());
+    this.pipe = pipe;
+
+    this.transportExecutorService =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.SYNC_SENDER_PIPE.getName() + "-" + pipe.getName());
+    this.heartbeatExecutorService =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.SYNC_SENDER_HEARTBEAT.getName() + "-" + pipe.getName());
+  }
+
+  public TransportClientMock getTransportClientMock() {
+    return (TransportClientMock) transportClient;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index 2ea72f4711..07630b54b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -48,7 +48,7 @@ import org.apache.iotdb.db.metadata.utils.MetaUtils;
 import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
@@ -65,7 +65,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -74,6 +73,8 @@ import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+
 /**
  * This class simulates the behaviour of configNode to manage the configs locally. The schema
  * configs include storage group, schema region and template. The data config is dataRegion.
@@ -230,6 +231,11 @@ public class LocalConfigNode {
   }
 
   public void deleteStorageGroup(PartialPath storageGroup) throws MetadataException {
+
+    DeleteTimeSeriesPlan deleteTimeSeriesPlan =
+        SchemaSyncManager.getInstance()
+            .splitDeleteTimeseriesPlanByDevice(storageGroup.concatNode(MULTI_LEVEL_PATH_WILDCARD));
+
     deleteSchemaRegionsInStorageGroup(
         storageGroup, schemaPartitionTable.getSchemaRegionIdsByStorageGroup(storageGroup));
 
@@ -237,8 +243,7 @@ public class LocalConfigNode {
       templateManager.unmarkStorageGroup(template, storageGroup.getFullPath());
     }
     if (SchemaSyncManager.getInstance().isEnableSync()) {
-      SchemaSyncManager.getInstance()
-          .syncMetadataPlan(new DeleteStorageGroupPlan(Collections.singletonList(storageGroup)));
+      SchemaSyncManager.getInstance().syncMetadataPlan(deleteTimeSeriesPlan);
     }
 
     if (!config.isEnableMemControl()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index 5a3622b1d3..a34b71633e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -720,7 +720,11 @@ public class LocalSchemaProcessor {
    * @return A HashSet instance which stores devices paths.
    */
   public Set<PartialPath> getBelongedDevices(PartialPath timeseries) throws MetadataException {
-    return getBelongedSchemaRegion(timeseries).getBelongedDevices(timeseries);
+    Set<PartialPath> result = new TreeSet<>();
+    for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(timeseries, false)) {
+      result.addAll(schemaRegion.getBelongedDevices(timeseries));
+    }
+    return result;
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
index c2e65729ed..b80af3b504 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
@@ -118,4 +118,16 @@ public class DeleteTimeSeriesPlan extends PhysicalPlan {
   public TSStatus[] getFailingStatus() {
     return StatusUtils.getFailingStatus(results, deletePathList.size());
   }
+
+  @Override
+  public String toString() {
+    return "DeleteTimeSeriesPlan{"
+        + "deletePathList="
+        + deletePathList
+        + ", results="
+        + results
+        + ", partitionFilter="
+        + partitionFilter
+        + '}';
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index f9e1228893..ae8b8d0b1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -139,6 +139,7 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(CacheHitRatioMonitor.getInstance());
     registerManager.register(CompactionTaskManager.getInstance());
     JMXService.registerMBean(getInstance(), mbeanName);
+    registerManager.register(SenderService.getInstance());
     registerManager.register(WALManager.getInstance());
 
     // in mpp mode we need to start some other services
@@ -188,7 +189,6 @@ public class IoTDB implements IoTDBMBean {
       }
     }
 
-    registerManager.register(SenderService.getInstance());
     registerManager.register(UpgradeSevice.getINSTANCE());
     // in mpp mode we temporarily don't start settle service because it uses StorageEngine directly
     // in itself, but currently we need to use StorageEngineV2 instead of StorageEngine in mpp mode.
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java
index a0c63590af..6f24cbdd05 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java
@@ -22,41 +22,45 @@ import org.apache.iotdb.rpc.RpcUtils;
 
 public class SyncConstant {
   /** common */
-  public static final String PIPE_LOG_DIR_NAME = "pipe-log";
+  public static final String UNKNOWN_IP = "UNKNOWN IP";
 
-  public static final String SYNC_DIR_NAME_SEPARATOR = "_";
+  public static final String SYNC_SYS_DIR = "sys";
+  public static final String FILE_DATA_DIR_NAME = "file-data";
+
+  // pipe log: serialNumber + SEPARATOR + SUFFIX
+  public static final String PIPE_LOG_DIR_NAME = "pipe-log";
+  public static final String PIPE_LOG_NAME_SEPARATOR = "_";
+  public static final String PIPE_LOG_NAME_SUFFIX = PIPE_LOG_NAME_SEPARATOR + "pipe.log";
+  public static final String COMMIT_LOG_NAME = "commit.log";
+  public static final Long DEFAULT_PIPE_LOG_SIZE_IN_BYTE = 10485760L;
 
   /** sender */
-  public static final String DEFAULT_PIPE_SINK_IP = "127.0.0.1";
 
-  public static final int DEFAULT_PIPE_SINK_PORT = 6670;
+  // dir structure
+  public static final String SENDER_DIR_NAME = "sender";
 
-  public static final String SENDER_PIPE_DIR_NAME = "sender";
-  public static final String FINISH_COLLECT_LOCK_NAME = "finishCollect.lock";
   public static final String HISTORY_PIPE_LOG_DIR_NAME = "history-" + PIPE_LOG_DIR_NAME;
-  public static final Long DEFAULT_HEARTBEAT_DELAY_SECONDS = 10 * 60L;
-  public static final Long DEFAULT_WAITING_FOR_TSFILE_CLOSE_MILLISECONDS = 500L;
-  public static final Long DEFAULT_WAITING_FOR_TSFILE_RETRY_NUMBER = 10L;
-  public static final Long DEFAULT_WAITING_FOR_STOP_MILLISECONDS = 100L;
+  public static final String FINISH_COLLECT_LOCK_NAME = "finishCollect.lock";
   public static final String MODS_OFFSET_FILE_SUFFIX = ".offset";
-  public static final String PIPE_LOG_NAME_SUFFIX = "_pipe.log";
-  public static final Long DEFAULT_PIPE_LOG_SIZE_IN_BYTE = 10485760L;
-  public static final String COMMIT_LOG_NAME = "commit.log";
-
-  public static final String UNKNOWN_IP = "UNKNOWN IP";
 
+  // recover
   public static final String SENDER_LOG_NAME = "senderService.log";
   public static final String PLAN_SERIALIZE_SPLIT_CHARACTER = ",";
-  public static final String SENDER_LOG_SPLIT_CHARACTER = " ";
-  public static final int MESSAGE_LENGTH_LIMIT = 200;
+  public static final String SENDER_LOG_SPLIT_CHARACTER = "#";
 
-  public static String getPipeLogName(long serialNumber) {
-    return serialNumber + PIPE_LOG_NAME_SUFFIX;
-  }
+  // data config
+  public static final String DEFAULT_PIPE_SINK_IP = "127.0.0.1";
+  public static final int DEFAULT_PIPE_SINK_PORT = 6670;
 
-  public static Long getSerialNumberFromPipeLogName(String pipeLogName) {
-    return Long.parseLong(pipeLogName.split("_")[0]);
-  }
+  public static final Long HEARTBEAT_DELAY_SECONDS = 10 * 60L;
+  public static final int CONNECT_TIMEOUT_MILLISECONDS = 1_000;
+  public static final int SOCKET_TIMEOUT_MILLISECONDS = 100_000;
+
+  public static final Long DEFAULT_WAITING_FOR_TSFILE_CLOSE_MILLISECONDS = 500L;
+  public static final Long DEFAULT_WAITING_FOR_TSFILE_RETRY_NUMBER = 10L;
+  public static final Long DEFAULT_WAITING_FOR_STOP_MILLISECONDS = 1000L;
+
+  public static final int MESSAGE_NUMBER_LIMIT = 1; // do not support multi lines now
 
   /** transport */
 
@@ -65,11 +69,9 @@ public class SyncConstant {
       Math.min(64 * 1024 * 1024, RpcUtils.THRIFT_FRAME_MAX_SIZE);
 
   /** receiver */
-  public static final String SYNC_SYS_DIR = "sys";
+  public static final String RECEIVER_DIR_NAME = "receiver";
 
-  public static final String RECEIVER_DIR = "receiver";
   public static final String RECEIVER_LOG_NAME = "receiverService.log";
   public static final String RECEIVER_MSG_LOG_NAME = "receiverMessage.log";
-  public static final String FILE_DATA_DIR_NAME = "file-data";
   public static final String IP_SEPARATOR = "\\.";
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java
index fc38852597..2161b27e1d 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java
@@ -32,33 +32,64 @@ public class SyncPathUtil {
     // forbidding instantiation
   }
 
+  // sync data structure
+  // data/sync
+  // |----sender dir
+  // |      |----sender pipe dir
+  // |             |----history pipe log dir
+  // |             |----realtime pipe log dir
+  // |             |----file data dir
+  // |----receiver dir
+  //        |-----receiver pipe dir
+  //                |----receiver pipe log dir
+  //                |----file data dir
+
   /** sender */
-  public static String getSenderPipeDir(String pipeName, long createTime) {
+  public static String getSenderDir() {
     return IoTDBDescriptor.getInstance().getConfig().getSyncDir()
         + File.separator
-        + SyncConstant.SENDER_PIPE_DIR_NAME
-        + String.format("-%s-%d", pipeName, createTime);
+        + SyncConstant.SENDER_DIR_NAME;
+  }
+
+  public static String getSenderPipeDir(String pipeName, long createTime) {
+    return getSenderDir() + File.separator + getSenderPipeDirName(pipeName, createTime);
+  }
+
+  public static String getSenderPipeDirName(String pipeName, long createTime) {
+    return String.format("%s-%d", pipeName, createTime);
   }
 
-  public static String getSenderHistoryPipeDataDir(String pipeName, long createTime) {
+  public static String getSenderHistoryPipeLogDir(String pipeName, long createTime) {
     return getSenderPipeDir(pipeName, createTime)
         + File.separator
         + SyncConstant.HISTORY_PIPE_LOG_DIR_NAME;
   }
 
-  public static String getSenderRealTimePipeDataDir(String pipeName, long createTime) {
+  public static String getSenderRealTimePipeLogDir(String pipeName, long createTime) {
     return getSenderPipeDir(pipeName, createTime) + File.separator + SyncConstant.PIPE_LOG_DIR_NAME;
   }
 
+  public static String getSenderFileDataDir(String pipeName, long createTime) {
+    return getSenderPipeDir(pipeName, createTime)
+        + File.separator
+        + SyncConstant.FILE_DATA_DIR_NAME;
+  }
+
   /** receiver */
-  public static String getFileDataDirPath(IdentityInfo identityInfo) {
-    return SyncPathUtil.getReceiverFileDataDir(
-        identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
+  public static String getReceiverDir() {
+    return IoTDBDescriptor.getInstance().getConfig().getSyncDir()
+        + File.separator
+        + SyncConstant.RECEIVER_DIR_NAME;
   }
 
-  public static String getPipeLogDirPath(IdentityInfo identityInfo) {
-    return SyncPathUtil.getReceiverPipeLogDir(
-        identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
+  public static String getReceiverPipeDir(String pipeName, String remoteIp, long createTime) {
+    return getReceiverDir()
+        + File.separator
+        + getReceiverPipeDirName(pipeName, remoteIp, createTime);
+  }
+
+  public static String getReceiverPipeDirName(String pipeName, String remoteIp, long createTime) {
+    return String.format("%s-%d-%s", pipeName, createTime, remoteIp);
   }
 
   public static String getReceiverPipeLogDir(String pipeName, String remoteIp, long createTime) {
@@ -73,30 +104,31 @@ public class SyncPathUtil {
         + SyncConstant.FILE_DATA_DIR_NAME;
   }
 
-  public static String getReceiverPipeDir(String pipeName, String remoteIp, long createTime) {
-    return getReceiverDir()
-        + File.separator
-        + getReceiverPipeFolderName(pipeName, remoteIp, createTime);
-  }
-
-  public static String getReceiverPipeFolderName(
-      String pipeName, String remoteIp, long createTime) {
-    return String.format("%s-%d-%s", pipeName, createTime, remoteIp);
+  public static String getFileDataDirPath(IdentityInfo identityInfo) {
+    return SyncPathUtil.getReceiverFileDataDir(
+        identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
   }
 
-  public static String getReceiverDir() {
-    return IoTDBDescriptor.getInstance().getConfig().getSyncDir()
-        + File.separator
-        + SyncConstant.RECEIVER_DIR;
+  public static String getPipeLogDirPath(IdentityInfo identityInfo) {
+    return SyncPathUtil.getReceiverPipeLogDir(
+        identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
   }
 
+  /** common */
   public static String getSysDir() {
     return IoTDBDescriptor.getInstance().getConfig().getSyncDir()
         + File.separator
         + SyncConstant.SYNC_SYS_DIR;
   }
 
-  /** common */
+  public static String getPipeLogName(long serialNumber) {
+    return serialNumber + SyncConstant.PIPE_LOG_NAME_SUFFIX;
+  }
+
+  public static Long getSerialNumberFromPipeLogName(String pipeLogName) {
+    return Long.parseLong(pipeLogName.split(SyncConstant.PIPE_LOG_NAME_SEPARATOR)[0]);
+  }
+
   public static boolean createFile(File file) throws IOException {
     if (!file.getParentFile().exists()) {
       file.getParentFile().mkdirs();
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java
index a8568bd72e..9a89f4dd57 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java
@@ -66,7 +66,7 @@ public class DeletionPipeData extends PipeData {
 
   @Override
   public String toString() {
-    return "DeletionData{" + "deletion=" + deletion + ", serialNumber=" + serialNumber + '}';
+    return "DeletionData{" + "serialNumber=" + serialNumber + ", deletion=" + deletion + '}';
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
index 88dcba6bd0..212a922f16 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
@@ -98,7 +98,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
 
     for (File file : logDir.listFiles())
       if (file.getName().endsWith(SyncConstant.PIPE_LOG_NAME_SUFFIX) && file.length() > 0) {
-        startNumbers.add(SyncConstant.getSerialNumberFromPipeLogName(file.getName()));
+        startNumbers.add(SyncPathUtil.getSerialNumberFromPipeLogName(file.getName()));
       }
     if (startNumbers.size() != 0) {
       Collections.sort(startNumbers);
@@ -114,7 +114,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
     }
 
     File writingPipeLog =
-        new File(pipeLogDir, SyncConstant.getPipeLogName(pipeLogStartNumber.peekLast()));
+        new File(pipeLogDir, SyncPathUtil.getPipeLogName(pipeLogStartNumber.peekLast()));
     try {
       List<PipeData> recoverPipeData = parsePipeLog(writingPipeLog);
       int recoverPipeDataSize = recoverPipeData.size();
@@ -157,7 +157,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
     }
 
     File readingPipeLog =
-        new File(pipeLogDir, SyncConstant.getPipeLogName(pipeLogStartNumber.peek()));
+        new File(pipeLogDir, SyncPathUtil.getPipeLogName(pipeLogStartNumber.peek()));
     try {
       List<PipeData> recoverPipeData = parsePipeLog(readingPipeLog);
       int recoverPipeDataSize = recoverPipeData.size();
@@ -214,7 +214,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
     if (outputStream != null) {
       outputStream.close();
     }
-    File newPipeLog = new File(pipeLogDir, SyncConstant.getPipeLogName(startSerialNumber));
+    File newPipeLog = new File(pipeLogDir, SyncPathUtil.getPipeLogName(startSerialNumber));
     SyncPathUtil.createFile(newPipeLog);
 
     outputStream = new DataOutputStream(new FileOutputStream(newPipeLog));
@@ -250,7 +250,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
         outputDeque = inputDeque;
       } else {
         List<PipeData> parsePipeData =
-            parsePipeLog(new File(pipeLogDir, SyncConstant.getPipeLogName(serialNumber)));
+            parsePipeLog(new File(pipeLogDir, SyncPathUtil.getPipeLogName(serialNumber)));
         int parsePipeDataSize = parsePipeData.size();
         outputDeque = new LinkedBlockingDeque<>();
         for (int i = 0; i < parsePipeDataSize; i++) {
@@ -354,7 +354,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
         if (!pipeLogStartNumber.isEmpty() && pipeLogStartNumber.peek() <= commitSerialNumber) {
           try {
             Files.deleteIfExists(
-                new File(pipeLogDir, SyncConstant.getPipeLogName(nowPipeLogStartNumber)).toPath());
+                new File(pipeLogDir, SyncPathUtil.getPipeLogName(nowPipeLogStartNumber)).toPath());
           } catch (IOException e) {
             logger.warn(
                 String.format("Delete %s-pipe.log error, because %s.", nowPipeLogStartNumber, e));
@@ -401,7 +401,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
   }
 
   @Override
-  public void clear() {
+  public void close() {
     try {
       if (outputStream != null) {
         outputStream.close();
@@ -415,12 +415,18 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
       inputDeque = null;
       pipeLogStartNumber = null;
       outputDeque = null;
-      File logDir = new File(pipeLogDir);
-      if (logDir.exists()) {
-        FileUtils.deleteDirectory(logDir);
-      }
     } catch (IOException e) {
-      logger.warn(String.format("Clear pipe log dir %s error, because %s.", pipeLogDir, e));
+      logger.warn(String.format("Close pipe log dir %s error.", pipeLogDir), e);
+    }
+  }
+
+  @Override
+  public void clear() {
+    close();
+
+    File logDir = new File(pipeLogDir);
+    if (logDir.exists()) {
+      FileUtils.deleteDirectory(logDir);
     }
   }
 
@@ -431,7 +437,6 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
         pipeData.add(PipeData.deserialize(inputStream));
       }
     } catch (EOFException e) {
-      logger.info(String.format("Finish parsing pipeLog %s.", file.getPath()));
     } catch (IllegalPathException e) {
       logger.error(String.format("Parsing pipeLog %s error, because %s", file.getPath(), e));
       throw new IOException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/PipeDataQueue.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/PipeDataQueue.java
index 576a06a4c4..1fb1054b19 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/PipeDataQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/PipeDataQueue.java
@@ -33,5 +33,7 @@ public interface PipeDataQueue {
 
   boolean isEmpty();
 
+  void close();
+
   void clear();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/collector/Collector.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/collector/Collector.java
index 99829641d7..8ea32f5a58 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/collector/Collector.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/collector/Collector.java
@@ -84,7 +84,7 @@ public class Collector {
   }
 
   public void startPipe(String pipeName, String remoteIp, long createTime) {
-    String dir = SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, createTime);
+    String dir = SyncPathUtil.getReceiverPipeDirName(pipeName, remoteIp, createTime);
     synchronized (dir.intern()) {
       if (!taskFutures.containsKey(dir)) {
         ScanTask task = new ScanTask(pipeName, remoteIp, createTime);
@@ -94,7 +94,7 @@ public class Collector {
   }
 
   public void stopPipe(String pipeName, String remoteIp, long createTime) {
-    String dir = SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, createTime);
+    String dir = SyncPathUtil.getReceiverPipeDirName(pipeName, remoteIp, createTime);
     logger.info("try stop task key={}", dir);
     synchronized (dir.intern()) {
       if (taskFutures.containsKey(dir)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
index 0ad998ea62..ba0c0f5296 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
@@ -135,7 +135,7 @@ public class ReceiverManager {
    */
   public synchronized void writePipeMessage(
       String pipeName, String remoteIp, long createTime, PipeMessage message) {
-    String pipeIdentifier = SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, createTime);
+    String pipeIdentifier = SyncPathUtil.getReceiverPipeDirName(pipeName, remoteIp, createTime);
     try {
       log.writePipeMsg(pipeIdentifier, message);
     } catch (IOException e) {
@@ -161,7 +161,7 @@ public class ReceiverManager {
   public synchronized List<PipeMessage> getPipeMessages(
       String pipeName, String remoteIp, long createTime, boolean consume) {
     List<PipeMessage> pipeMessageList = new ArrayList<>();
-    String pipeIdentifier = SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, createTime);
+    String pipeIdentifier = SyncPathUtil.getReceiverPipeDirName(pipeName, remoteIp, createTime);
     if (consume) {
       try {
         log.comsumePipeMsg(pipeIdentifier);
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/SchemaSyncManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/SchemaSyncManager.java
index e2bfb99c1e..6e3686df17 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/SchemaSyncManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/SchemaSyncManager.java
@@ -40,7 +40,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
-import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_RESULT_NODES;
 
 /**
@@ -80,20 +79,7 @@ public class SchemaSyncManager {
   }
 
   public void syncMetadataPlan(PhysicalPlan plan) {
-    try {
-      switch (plan.getOperatorType()) {
-        case DELETE_STORAGE_GROUP:
-          syncPipe.collectRealTimeMetaData(
-              splitDeleteTimeseriesPlanByDevice(
-                  plan.getPaths().get(0).concatNode(MULTI_LEVEL_PATH_WILDCARD)));
-          break;
-        default:
-          syncPipe.collectRealTimeMetaData(plan);
-          break;
-      }
-    } catch (MetadataException e) {
-
-    }
+    syncPipe.collectRealTimeMetaData(plan);
   }
 
   public void clear() {
@@ -144,7 +130,7 @@ public class SchemaSyncManager {
     return result;
   }
 
-  private DeleteTimeSeriesPlan splitDeleteTimeseriesPlanByDevice(PartialPath pathPattern)
+  public DeleteTimeSeriesPlan splitDeleteTimeseriesPlanByDevice(PartialPath pathPattern)
       throws MetadataException {
     return new DeleteTimeSeriesPlan(splitPathPatternByDevice(pathPattern));
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/IoTDBPipeSink.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/IoTDBPipeSink.java
index 3c5b619a63..4ba9194838 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/IoTDBPipeSink.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/IoTDBPipeSink.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.db.exception.sync.PipeSinkException;
 import org.apache.iotdb.db.sync.conf.SyncConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import java.util.Objects;
+
 public class IoTDBPipeSink implements PipeSink {
   private final PipeSink.Type type;
 
@@ -94,4 +96,20 @@ public class IoTDBPipeSink implements PipeSink {
         + port
         + '}';
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    IoTDBPipeSink pipeSink = (IoTDBPipeSink) o;
+    return port == pipeSink.port
+        && type == pipeSink.type
+        && Objects.equals(name, pipeSink.name)
+        && Objects.equals(ip, pipeSink.ip);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(type, name, ip, port);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
index 82d1ccce9d..94aae7b40b 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.sync.sender.pipe;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.exception.sync.PipeException;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
+import org.apache.iotdb.db.sync.sender.service.SenderService;
 import org.apache.iotdb.db.sync.transport.client.ITransportClient;
 
 /**
@@ -52,6 +53,15 @@ public interface Pipe {
    */
   void drop() throws PipeException;
 
+  /**
+   * Close this pipe, stop collecting data from IoTDB, but do not delete information about this pipe
+   * on disk. Used for {@linkplain SenderService#shutdown(long)}. Do not change the status of this
+   * pipe.
+   *
+   * @throws PipeException Some inside error happens(such as IOException about disk).
+   */
+  void close() throws PipeException;
+
   /**
    * Get the name of this pipe.
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
index 286847916e..3faefbfbdb 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
@@ -41,6 +41,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.locks.ReentrantLock;
 
 public class TsFilePipe implements Pipe {
@@ -73,9 +74,9 @@ public class TsFilePipe implements Pipe {
     this.syncDelOp = syncDelOp;
 
     this.historyQueue =
-        new BufferedPipeDataQueue(SyncPathUtil.getSenderHistoryPipeDataDir(name, createTime));
+        new BufferedPipeDataQueue(SyncPathUtil.getSenderHistoryPipeLogDir(name, createTime));
     this.realTimeQueue =
-        new BufferedPipeDataQueue(SyncPathUtil.getSenderRealTimePipeDataDir(name, createTime));
+        new BufferedPipeDataQueue(SyncPathUtil.getSenderRealTimePipeLogDir(name, createTime));
     this.pipeLog = new TsFilePipeLogger(this);
     this.collectRealTimeDataLock = new ReentrantLock();
 
@@ -298,7 +299,7 @@ public class TsFilePipe implements Pipe {
     status = PipeStatus.DROP;
   }
 
-  private void clear() throws PipeException {
+  private void clear() {
     deregisterMetadata();
     deregisterTsFile();
     isCollectingRealTimeData = false;
@@ -312,6 +313,20 @@ public class TsFilePipe implements Pipe {
     }
   }
 
+  @Override
+  public void close() throws PipeException {
+    if (status == PipeStatus.DROP) {
+      return;
+    }
+
+    deregisterMetadata();
+    deregisterTsFile();
+    isCollectingRealTimeData = false;
+
+    historyQueue.close();
+    realTimeQueue.close();
+  }
+
   @Override
   public String getName() {
     return name;
@@ -331,4 +346,44 @@ public class TsFilePipe implements Pipe {
   public synchronized PipeStatus getStatus() {
     return status;
   }
+
+  @Override
+  public String toString() {
+    return "TsFilePipe{"
+        + "createTime="
+        + createTime
+        + ", name='"
+        + name
+        + '\''
+        + ", pipeSink="
+        + pipeSink
+        + ", dataStartTime="
+        + dataStartTime
+        + ", syncDelOp="
+        + syncDelOp
+        + ", pipeLog="
+        + pipeLog
+        + ", isCollectingRealTimeData="
+        + isCollectingRealTimeData
+        + ", maxSerialNumber="
+        + maxSerialNumber
+        + ", status="
+        + status
+        + '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    TsFilePipe that = (TsFilePipe) o;
+    return createTime == that.createTime
+        && Objects.equals(name, that.name)
+        && Objects.equals(pipeSink, that.pipeSink);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(createTime, name, pipeSink);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogAnalyzer.java
index bb95056a6e..c5124d4dfb 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogAnalyzer.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.sync.conf.SyncConstant;
 import org.apache.iotdb.db.sync.conf.SyncPathUtil;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
+import org.apache.iotdb.db.sync.sender.service.MsgManager;
 import org.apache.iotdb.db.sync.sender.service.SenderService;
 
 import java.io.BufferedReader;
@@ -46,7 +47,7 @@ public class SenderLogAnalyzer {
 
   private Pipe runningPipe;
   private Pipe.PipeStatus runningPipeStatus;
-  private String runningMsg;
+  private MsgManager msgManager;
 
   public SenderLogAnalyzer() throws IOException {
     senderLog = new File(SyncPathUtil.getSysDir(), SyncConstant.SENDER_LOG_NAME);
@@ -56,7 +57,7 @@ public class SenderLogAnalyzer {
 
     this.pipeSinks = new HashMap<>();
     this.pipes = new ArrayList<>();
-    this.runningMsg = "";
+    this.msgManager = new MsgManager();
   }
 
   public void recover() throws IOException {
@@ -97,19 +98,20 @@ public class SenderLogAnalyzer {
                         Long.parseLong(parseStrings[1]));
             pipes.add(runningPipe);
             runningPipeStatus = runningPipe.getStatus();
-            runningMsg = "";
+            msgManager.addPipe(runningPipe);
             break;
           case STOP_PIPE: // ignore status check
             runningPipeStatus = Pipe.PipeStatus.STOP;
-            appendMsg(parseStrings);
+            msgManager.recoverMsg(parseStrings);
             break;
           case START_PIPE:
             runningPipeStatus = Pipe.PipeStatus.RUNNING;
-            appendMsg(parseStrings);
+            msgManager.recoverMsg(parseStrings);
             break;
           case DROP_PIPE:
             runningPipeStatus = Pipe.PipeStatus.DROP;
             runningPipe.drop();
+            msgManager.removeAllPipe();
             break;
           default:
             throw new UnsupportedOperationException(
@@ -145,15 +147,6 @@ public class SenderLogAnalyzer {
     br.close();
   }
 
-  private void appendMsg(String[] parseStrings) {
-    if (parseStrings.length == 3) {
-      if (runningMsg.length() > 0) {
-        runningMsg += System.lineSeparator();
-      }
-      runningMsg += parseStrings[2];
-    }
-  }
-
   public Map<String, PipeSink> getRecoveryAllPipeSinks() {
     return pipeSinks;
   }
@@ -166,7 +159,7 @@ public class SenderLogAnalyzer {
     return runningPipe;
   }
 
-  public String getRecoveryRunningMsg() {
-    return runningMsg;
+  public MsgManager getMsgManager() {
+    return msgManager;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogger.java
index af522c889a..02f6628f79 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogger.java
@@ -60,7 +60,7 @@ public class SenderLogger {
     }
   }
 
-  public void addPipeSink(CreatePipeSinkPlan plan) {
+  public synchronized void addPipeSink(CreatePipeSinkPlan plan) {
     getBufferedWriter();
     try {
       bw.write(Operator.OperatorType.CREATE_PIPESINK.name());
@@ -73,7 +73,7 @@ public class SenderLogger {
     }
   }
 
-  public void dropPipeSink(String pipeSinkName) {
+  public synchronized void dropPipeSink(String pipeSinkName) {
     getBufferedWriter();
     try {
       bw.write(Operator.OperatorType.DROP_PIPESINK.name());
@@ -86,7 +86,7 @@ public class SenderLogger {
     }
   }
 
-  public void addPipe(CreatePipePlan plan, long pipeCreateTime) {
+  public synchronized void addPipe(CreatePipePlan plan, long pipeCreateTime) {
     getBufferedWriter();
     try {
       bw.write(Operator.OperatorType.CREATE_PIPE.name());
@@ -101,7 +101,7 @@ public class SenderLogger {
     }
   }
 
-  public void operatePipe(String pipeName, Operator.OperatorType type) {
+  public synchronized void operatePipe(String pipeName, Operator.OperatorType type) {
     getBufferedWriter();
     try {
       bw.write(type.name());
@@ -114,7 +114,7 @@ public class SenderLogger {
     }
   }
 
-  public void recordMsg(String pipeName, Operator.OperatorType type, String msg) {
+  public synchronized void recordMsg(String pipeName, Operator.OperatorType type, String msg) {
     getBufferedWriter();
     try {
       bw.write(type.name());
@@ -129,7 +129,7 @@ public class SenderLogger {
     }
   }
 
-  public void close() {
+  public synchronized void close() {
     try {
       if (bw != null) {
         bw.close();
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java
index e6e599005e..40ea57e5bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java
@@ -46,7 +46,7 @@ public class TsFilePipeLogger {
 
   public TsFilePipeLogger(TsFilePipe tsFilePipe) {
     pipeDir = SyncPathUtil.getSenderPipeDir(tsFilePipe.getName(), tsFilePipe.getCreateTime());
-    tsFileDir = new File(pipeDir, SyncConstant.FILE_DATA_DIR_NAME).getPath();
+    tsFileDir = SyncPathUtil.getSenderFileDataDir(tsFilePipe.getName(), tsFilePipe.getCreateTime());
   }
 
   /** make hard link for tsfile * */
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/MsgManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/MsgManager.java
new file mode 100644
index 0000000000..4853dec5d1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/MsgManager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.sync.sender.service;
+
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.sync.conf.SyncConstant;
+import org.apache.iotdb.db.sync.conf.SyncPathUtil;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.db.sync.sender.recovery.SenderLogger;
+import org.apache.iotdb.service.transport.thrift.ResponseType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+public class MsgManager {
+  private static final Logger logger = LoggerFactory.getLogger(MsgManager.class);
+
+  private SenderLogger senderLogger;
+
+  private Pipe runningPipe;
+  private Queue<String> Messages;
+
+  public MsgManager() {}
+
+  public MsgManager(SenderLogger senderLogger) {
+    this.senderLogger = senderLogger;
+  }
+
+  public void addPipe(Pipe pipe) {
+    this.runningPipe = pipe;
+    Messages = new ArrayDeque<>();
+  }
+
+  public void removeAllPipe() {
+    runningPipe = null;
+    Messages = null;
+  }
+
+  public synchronized void recordMsg(
+      Pipe pipe, Operator.OperatorType operatorType, ResponseType type, String inputMsg) {
+    if (runningPipe == null) {
+      logger.warn(
+          String.format("No running Pipe for recording msg [%s] %s.", type.name(), inputMsg));
+      return;
+    } else if (!pipe.equals(runningPipe)) {
+      logger.warn(
+          String.format(
+              "Input Pipe %s is not equal running Pipe %s, ignore it.",
+              pipe.getName(), runningPipe.getName()));
+    }
+
+    String msg = String.format("[%s] ", type.name()) + SyncPathUtil.createMsg(inputMsg);
+    if (Messages.size() > SyncConstant.MESSAGE_NUMBER_LIMIT) {
+      Messages.poll();
+    }
+    Messages.offer(msg);
+    if (senderLogger != null) { // not in recover
+      senderLogger.recordMsg(pipe.getName(), operatorType, msg);
+    }
+  }
+
+  public synchronized String getPipeMsg(Pipe pipe) {
+    if (runningPipe == null) {
+      return "";
+    } else if (!pipe.equals(runningPipe) || Messages.size() == 0) {
+      return "";
+    }
+
+    StringBuilder builder = new StringBuilder();
+    int size = Messages.size();
+    for (int i = 0; i < size; i++) {
+      String msg = Messages.poll();
+      if (i < SyncConstant.MESSAGE_NUMBER_LIMIT) {
+        builder.append(msg);
+        //        builder.append(System.lineSeparator()); do not support multi lines now
+      }
+      Messages.offer(msg);
+    }
+    if (size > SyncConstant.MESSAGE_NUMBER_LIMIT) {
+      builder.append(" ...");
+    }
+    //    builder.append("  (for More info, check $IOTDB_HOME$/log/ please.)");
+    return builder.toString();
+  }
+
+  public void recoverMsg(String[] parseStrings) {
+    if (parseStrings.length == 3) {
+      if (Messages.size() > SyncConstant.MESSAGE_NUMBER_LIMIT) {
+        Messages.poll();
+      }
+      Messages.offer(parseStrings[2]);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
index b4861053ef..116fb65bcb 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.exception.ShutdownException;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.exception.SyncConnectionException;
 import org.apache.iotdb.db.exception.sync.PipeException;
 import org.apache.iotdb.db.exception.sync.PipeSinkException;
@@ -39,8 +38,6 @@ import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
 import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
 import org.apache.iotdb.db.sync.sender.recovery.SenderLogAnalyzer;
 import org.apache.iotdb.db.sync.sender.recovery.SenderLogger;
-import org.apache.iotdb.db.sync.transport.client.ITransportClient;
-import org.apache.iotdb.db.sync.transport.client.TransportClient;
 import org.apache.iotdb.service.transport.thrift.RequestType;
 import org.apache.iotdb.service.transport.thrift.SyncResponse;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -64,7 +61,7 @@ public class SenderService implements IService {
   private List<Pipe> pipes;
 
   private Pipe runningPipe;
-  private String runningMsg;
+  private MsgManager msgManager;
 
   private TransportHandler transportHandler;
 
@@ -167,13 +164,8 @@ public class SenderService implements IService {
     PipeSink runningPipeSink = getPipeSink(plan.getPipeSinkName());
     runningPipe = parseCreatePipePlan(plan, runningPipeSink, currentTime);
     try {
-      ITransportClient transportClient =
-          new TransportClient(
-              runningPipe,
-              ((IoTDBPipeSink) runningPipeSink).getIp(),
-              ((IoTDBPipeSink) runningPipeSink).getPort());
       transportHandler =
-          new TransportHandler(transportClient, runningPipe.getName(), runningPipe.getCreateTime());
+          TransportHandler.getNewTransportHandler(runningPipe, (IoTDBPipeSink) runningPipeSink);
       sendMsg(RequestType.CREATE);
     } catch (ClassCastException e) {
       logger.error(
@@ -191,7 +183,7 @@ public class SenderService implements IService {
       throw e;
     }
 
-    runningMsg = "";
+    msgManager.addPipe(runningPipe);
     pipes.add(runningPipe);
     senderLogger.addPipe(plan, currentTime);
   }
@@ -243,6 +235,7 @@ public class SenderService implements IService {
       }
 
       runningPipe.drop();
+      msgManager.removeAllPipe();
       sendMsg(RequestType.DROP);
       senderLogger.operatePipe(pipeName, Operator.OperatorType.DROP_PIPE);
     } catch (InterruptedException e) {
@@ -258,7 +251,7 @@ public class SenderService implements IService {
   }
 
   public synchronized String getPipeMsg(Pipe pipe) {
-    return pipe == runningPipe ? runningMsg : "";
+    return msgManager.getPipeMsg(pipe);
   }
 
   private void checkRunningPipeExistAndName(String pipeName) throws PipeException {
@@ -291,7 +284,7 @@ public class SenderService implements IService {
 
   public synchronized void receiveMsg(SyncResponse response) {
     if (runningPipe == null || runningPipe.getStatus() == Pipe.PipeStatus.DROP) {
-      logger.warn(String.format("No running pipe for receiving msg %s.", response));
+      logger.info(String.format("No running pipe for receiving msg %s.", response));
       return;
     }
     switch (response.type) {
@@ -302,21 +295,18 @@ public class SenderService implements IService {
         try {
           stopPipe(runningPipe.getName());
         } catch (PipeException e) {
-          logger.error(
+          logger.warn(
               String.format(
                   "Stop pipe %s when meeting error in sender service.", runningPipe.getName()),
               e);
         }
       case WARN:
-        if (runningMsg.length() > 0) {
-          runningMsg += System.lineSeparator();
-        }
-        runningMsg += (response.type.name() + " " + response.msg);
-        senderLogger.recordMsg(
-            runningPipe.getName(),
+        msgManager.recordMsg(
+            runningPipe,
             runningPipe.getStatus() == Pipe.PipeStatus.RUNNING
                 ? Operator.OperatorType.START_PIPE
                 : Operator.OperatorType.STOP_PIPE,
+            response.type,
             response.msg);
         break;
     }
@@ -328,7 +318,7 @@ public class SenderService implements IService {
     this.pipeSinks = new HashMap<>();
     this.pipes = new ArrayList<>();
     this.senderLogger = new SenderLogger();
-    this.runningMsg = "";
+    this.msgManager = new MsgManager(senderLogger);
 
     File senderLog = new File(SyncPathUtil.getSysDir(), SyncConstant.SENDER_LOG_NAME);
     if (senderLog.exists()) {
@@ -346,33 +336,35 @@ public class SenderService implements IService {
     if (runningPipe != null && !Pipe.PipeStatus.DROP.equals(runningPipe.getStatus())) {
       try {
         runningPipe.stop();
+        transportHandler.stop();
       } catch (PipeException e) {
         logger.warn(
-            String.format(
-                "Stop pipe %s error when stop Sender Service, because %s.",
-                runningPipe.getName(), e));
+            String.format("Stop pipe %s error when stop Sender Service.", runningPipe.getName()),
+            e);
       }
     }
   }
 
   @Override
   public void shutdown(long milliseconds) throws ShutdownException {
+    pipeSinks = null;
+    pipes = null;
+    msgManager = null;
+    senderLogger.close();
+
     if (runningPipe != null && !Pipe.PipeStatus.DROP.equals(runningPipe.getStatus())) {
       try {
         runningPipe.stop();
-      } catch (PipeException e) {
+        transportHandler.close();
+        runningPipe.close();
+      } catch (PipeException | InterruptedException e) {
         logger.warn(
             String.format(
-                "Stop pipe %s error when shutdown Sender Service, because %s.",
-                runningPipe.getName(), e));
+                "Stop pipe %s error when shutdown Sender Service.", runningPipe.getName()),
+            e);
         throw new ShutdownException(e);
       }
     }
-
-    pipeSinks = null;
-    pipes = null;
-    runningMsg = null;
-    senderLogger.close();
   }
 
   @Override
@@ -380,36 +372,21 @@ public class SenderService implements IService {
     return ServiceType.SENDER_SERVICE;
   }
 
-  private void recover() throws IOException, InterruptedException {
+  private void recover() throws IOException {
     SenderLogAnalyzer analyzer = new SenderLogAnalyzer();
     analyzer.recover();
     this.pipeSinks = analyzer.getRecoveryAllPipeSinks();
     this.pipes = analyzer.getRecoveryAllPipes();
     this.runningPipe = analyzer.getRecoveryRunningPipe();
-    this.runningMsg = analyzer.getRecoveryRunningMsg();
+    this.msgManager = analyzer.getMsgManager();
 
-    if (runningPipe != null) {
-      IoTDBPipeSink pipeSink = (IoTDBPipeSink) runningPipe.getPipeSink();
-      ITransportClient transportClient =
-          new TransportClient(runningPipe, pipeSink.getIp(), pipeSink.getPort());
+    if (runningPipe != null && !Pipe.PipeStatus.DROP.equals(runningPipe.getStatus())) {
       this.transportHandler =
-          new TransportHandler(transportClient, runningPipe.getName(), runningPipe.getCreateTime());
+          TransportHandler.getNewTransportHandler(
+              runningPipe, (IoTDBPipeSink) runningPipe.getPipeSink());
       if (Pipe.PipeStatus.RUNNING.equals(runningPipe.getStatus())) {
         transportHandler.start();
-      } else if (Pipe.PipeStatus.DROP.equals(runningPipe.getStatus())) {
-        transportHandler.close();
       }
     }
   }
-
-  /** test */
-  @TestOnly
-  public Pipe getRunningPipe() {
-    return runningPipe;
-  }
-
-  @TestOnly
-  public void setTransportHandler(TransportHandler handler) {
-    this.transportHandler = handler;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
index ed71629c7c..f510dd52da 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
@@ -21,9 +21,13 @@ package org.apache.iotdb.db.sync.sender.service;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.exception.SyncConnectionException;
 import org.apache.iotdb.db.sync.conf.SyncConstant;
+import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.db.sync.transport.client.ITransportClient;
+import org.apache.iotdb.db.sync.transport.client.TransportClient;
 import org.apache.iotdb.service.transport.thrift.RequestType;
 import org.apache.iotdb.service.transport.thrift.SyncRequest;
 import org.apache.iotdb.service.transport.thrift.SyncResponse;
@@ -40,22 +44,23 @@ import java.util.concurrent.TimeUnit;
 
 public class TransportHandler {
   private static final Logger logger = LoggerFactory.getLogger(TransportHandler.class);
+  private static TransportHandler DEBUG_TRANSPORT_HANDLER = null; // test only
 
-  private final String pipeName;
+  private String pipeName;
+  private long createTime;
   private final String localIp;
-  private final long createTime;
-  private final ITransportClient transportClient;
+  protected ITransportClient transportClient;
 
-  private final ExecutorService transportExecutorService;
+  protected ExecutorService transportExecutorService;
   private Future transportFuture;
 
-  private final ScheduledExecutorService heartbeatExecutorService;
+  protected ScheduledExecutorService heartbeatExecutorService;
   private Future heartbeatFuture;
 
-  public TransportHandler(ITransportClient transportClient, String pipeName, long createTime) {
-    this.pipeName = pipeName;
-    this.createTime = createTime;
-    this.transportClient = transportClient;
+  public TransportHandler(Pipe pipe, IoTDBPipeSink pipeSink) {
+    this.pipeName = pipe.getName();
+    this.createTime = pipe.getCreateTime();
+    this.transportClient = new TransportClient(pipe, pipeSink.getIp(), pipeSink.getPort());
 
     this.transportExecutorService =
         IoTDBThreadPoolFactory.newSingleThreadExecutor(
@@ -80,8 +85,8 @@ public class TransportHandler {
     heartbeatFuture =
         heartbeatExecutorService.scheduleWithFixedDelay(
             this::sendHeartbeat,
-            SyncConstant.DEFAULT_HEARTBEAT_DELAY_SECONDS,
-            SyncConstant.DEFAULT_HEARTBEAT_DELAY_SECONDS,
+            SyncConstant.HEARTBEAT_DELAY_SECONDS,
+            SyncConstant.HEARTBEAT_DELAY_SECONDS,
             TimeUnit.SECONDS);
   }
 
@@ -124,4 +129,24 @@ public class TransportHandler {
               pipeName, e));
     }
   }
+
+  public static TransportHandler getNewTransportHandler(Pipe pipe, IoTDBPipeSink pipeSink) {
+    if (DEBUG_TRANSPORT_HANDLER == null) {
+      return new TransportHandler(pipe, pipeSink);
+    }
+    DEBUG_TRANSPORT_HANDLER.resetTransportClient(pipe); // test only
+    return DEBUG_TRANSPORT_HANDLER;
+  }
+
+  /** test */
+  @TestOnly
+  public static void setDebugTransportHandler(TransportHandler transportHandler) {
+    DEBUG_TRANSPORT_HANDLER = transportHandler;
+  }
+
+  @TestOnly
+  protected void resetTransportClient(Pipe pipe) {
+    this.pipeName = pipe.getName();
+    this.createTime = pipe.getCreateTime();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
index e75ca777b2..c495a7b256 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
@@ -23,13 +23,13 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.SyncConnectionException;
 import org.apache.iotdb.db.sync.conf.SyncConstant;
-import org.apache.iotdb.db.sync.conf.SyncPathUtil;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
 import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.db.sync.sender.service.SenderService;
 import org.apache.iotdb.db.sync.transport.conf.TransportConstant;
 import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
 import org.apache.iotdb.service.transport.thrift.IdentityInfo;
 import org.apache.iotdb.service.transport.thrift.MetaInfo;
 import org.apache.iotdb.service.transport.thrift.RequestType;
@@ -44,6 +44,7 @@ import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,8 +73,6 @@ public class TransportClient implements ITransportClient {
 
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  private static final int TIMEOUT_MS = 2000_000;
-
   private static final int TRANSFER_BUFFER_SIZE_IN_BYTES = 1 * 1024 * 1024;
 
   private TTransport transport = null;
@@ -126,7 +125,14 @@ public class TransportClient implements ITransportClient {
     }
 
     try {
-      transport = RpcTransportFactory.INSTANCE.getTransport(ipAddress, port, TIMEOUT_MS);
+      transport =
+          RpcTransportFactory.INSTANCE.getTransport(
+              new TSocket(
+                  TConfigurationConst.defaultTConfiguration,
+                  ipAddress,
+                  port,
+                  SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
+                  SyncConstant.CONNECT_TIMEOUT_MILLISECONDS));
       TProtocol protocol;
       if (config.isRpcThriftCompressionEnable()) {
         protocol = new TCompactProtocol(transport);
@@ -461,14 +467,14 @@ public class TransportClient implements ITransportClient {
       while (!Thread.currentThread().isInterrupted()) {
         PipeData pipeData = pipe.take();
         if (!senderTransport(pipeData)) {
-          logger.warn(String.format("Can not transfer pipedata %s, skip it.", pipeData));
+          logger.error(String.format("Can not transfer pipedata %s, skip it.", pipeData));
           // can do something.
           SenderService.getInstance()
               .receiveMsg(
                   new SyncResponse(
                       ResponseType.WARN,
-                      SyncPathUtil.createMsg(
-                          String.format("Transfer piepdata %s error, skip it.", pipeData))));
+                      String.format(
+                          "Transfer piepdata %s error, skip it.", pipeData.getSerialNumber())));
           continue;
         }
         pipe.commit();
@@ -482,10 +488,9 @@ public class TransportClient implements ITransportClient {
           .receiveMsg(
               new SyncResponse(
                   ResponseType.ERROR,
-                  SyncPathUtil.createMsg(
-                      String.format(
-                          "Can not connect to %s:%d, because %s, please check receiver and internet.",
-                          ipAddress, port, e.getMessage()))));
+                  String.format(
+                      "Can not connect to %s:%d, please check receiver and Internet.",
+                      ipAddress, port)));
     } finally {
       close();
     }
@@ -504,7 +509,13 @@ public class TransportClient implements ITransportClient {
       }
 
       try (TTransport heartbeatTransport =
-          RpcTransportFactory.INSTANCE.getTransport(ipAddress, port, TIMEOUT_MS)) {
+          RpcTransportFactory.INSTANCE.getTransport(
+              new TSocket(
+                  TConfigurationConst.defaultTConfiguration,
+                  ipAddress,
+                  port,
+                  SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
+                  SyncConstant.CONNECT_TIMEOUT_MILLISECONDS))) {
         TProtocol protocol;
         if (config.isRpcThriftCompressionEnable()) {
           protocol = new TCompactProtocol(heartbeatTransport);
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
index 4a9b17eafe..c051e85e77 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
@@ -73,7 +73,7 @@ public class BufferedPipeDataQueueTest {
       DataOutputStream pipeLogOutput1 =
           new DataOutputStream(
               new FileOutputStream(
-                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
       for (int i = 0; i < 4; i++) {
         new TsFilePipeData("", i).serialize(pipeLogOutput1);
       }
@@ -82,7 +82,7 @@ public class BufferedPipeDataQueueTest {
       DataOutputStream pipeLogOutput2 =
           new DataOutputStream(
               new FileOutputStream(
-                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
       for (int i = 4; i < 11; i++) {
         new TsFilePipeData("", i).serialize(pipeLogOutput2);
       }
@@ -91,7 +91,7 @@ public class BufferedPipeDataQueueTest {
       DataOutputStream pipeLogOutput3 =
           new DataOutputStream(
               new FileOutputStream(
-                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(11)), false));
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(11)), false));
       pipeLogOutput3.close();
       // recovery
       BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
@@ -210,7 +210,7 @@ public class BufferedPipeDataQueueTest {
       DataOutputStream pipeLogOutput1 =
           new DataOutputStream(
               new FileOutputStream(
-                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
       for (int i = 0; i < 4; i++) {
         PipeData pipeData = new TsFilePipeData("fake" + i, i);
         pipeDataList.add(pipeData);
@@ -221,7 +221,7 @@ public class BufferedPipeDataQueueTest {
       DataOutputStream pipeLogOutput2 =
           new DataOutputStream(
               new FileOutputStream(
-                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
       for (int i = 4; i < 8; i++) {
         PipeData pipeData =
             new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
@@ -239,7 +239,7 @@ public class BufferedPipeDataQueueTest {
       DataOutputStream pipeLogOutput3 =
           new DataOutputStream(
               new FileOutputStream(
-                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(11)), false));
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(11)), false));
       pipeLogOutput3.close();
       // recovery
       BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
@@ -300,7 +300,7 @@ public class BufferedPipeDataQueueTest {
       DataOutputStream pipeLogOutput1 =
           new DataOutputStream(
               new FileOutputStream(
-                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
       for (int i = 0; i < 4; i++) {
         PipeData pipeData = new TsFilePipeData("fake" + i, i);
         pipeDataList.add(pipeData);
@@ -311,7 +311,7 @@ public class BufferedPipeDataQueueTest {
       DataOutputStream pipeLogOutput2 =
           new DataOutputStream(
               new FileOutputStream(
-                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
       for (int i = 4; i < 8; i++) {
         PipeData pipeData =
             new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
@@ -329,7 +329,7 @@ public class BufferedPipeDataQueueTest {
       DataOutputStream pipeLogOutput3 =
           new DataOutputStream(
               new FileOutputStream(
-                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(11)), false));
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(11)), false));
       pipeLogOutput3.close();
       // recovery
       BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
@@ -387,7 +387,7 @@ public class BufferedPipeDataQueueTest {
       DataOutputStream pipeLogOutput1 =
           new DataOutputStream(
               new FileOutputStream(
-                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
       for (int i = 0; i < 4; i++) {
         PipeData pipeData = new TsFilePipeData("fake" + i, i);
         pipeDataList.add(pipeData);
@@ -398,7 +398,7 @@ public class BufferedPipeDataQueueTest {
       DataOutputStream pipeLogOutput2 =
           new DataOutputStream(
               new FileOutputStream(
-                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
       for (int i = 4; i < 8; i++) {
         PipeData pipeData =
             new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
@@ -468,7 +468,7 @@ public class BufferedPipeDataQueueTest {
       DataOutputStream pipeLogOutput1 =
           new DataOutputStream(
               new FileOutputStream(
-                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
       for (int i = 0; i < 4; i++) {
         PipeData pipeData = new TsFilePipeData("fake" + i, i);
         pipeDataList.add(pipeData);
@@ -479,7 +479,7 @@ public class BufferedPipeDataQueueTest {
       DataOutputStream pipeLogOutput2 =
           new DataOutputStream(
               new FileOutputStream(
-                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
       for (int i = 4; i < 8; i++) {
         PipeData pipeData =
             new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
index ac41da744e..ae47d4d8d7 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
@@ -87,8 +87,8 @@ public class ReceiverLogAnalyzerTest {
 
   @Test
   public void testMessageLog() {
-    String pipeIdentifier1 = SyncPathUtil.getReceiverPipeFolderName(pipe1, ip1, createdTime1);
-    String pipeIdentifier2 = SyncPathUtil.getReceiverPipeFolderName(pipe2, ip2, createdTime2);
+    String pipeIdentifier1 = SyncPathUtil.getReceiverPipeDirName(pipe1, ip1, createdTime1);
+    String pipeIdentifier2 = SyncPathUtil.getReceiverPipeDirName(pipe2, ip2, createdTime2);
     try {
       ReceiverLog log = new ReceiverLog();
       PipeMessage info = new PipeMessage(PipeMessage.MsgType.INFO, "info");