You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/03/02 14:02:18 UTC

[iotdb] branch new_sync updated: [To new_sync][IOTDB-1907] implement customized sync process: pipeDataQueue (#5123)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch new_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/new_sync by this push:
     new b3bff7c  [To new_sync][IOTDB-1907] implement customized sync process: pipeDataQueue (#5123)
b3bff7c is described below

commit b3bff7cd8c3fcd29c6fc7ddba0b8dab3cc5b3778
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Wed Mar 2 22:01:22 2022 +0800

    [To new_sync][IOTDB-1907] implement customized sync process: pipeDataQueue (#5123)
---
 .../sync/IoTDBSyncReceiverCollectorIT.java         |   6 +-
 .../BufferedPipeDataBlockingQueue.java             |   2 +-
 .../apache/iotdb/db/newsync/conf/SyncConstant.java |  58 +++
 .../db/newsync/{utils => conf}/SyncPathUtil.java   |  15 +-
 .../apache/iotdb/db/newsync/pipedata/PipeData.java |  28 +-
 .../iotdb/db/newsync/pipedata/TsFilePipeData.java  |  50 +-
 .../pipedata/queue/BufferedPipeDataQueue.java      | 428 ++++++++++++++++
 .../queue/PipeDataQueue.java}                      |  25 +-
 .../iotdb/db/newsync/receiver/ReceiverService.java |   2 +-
 .../db/newsync/receiver/collector/Collector.java   |   6 +-
 .../db/newsync/receiver/recovery/ReceiverLog.java  |   4 +-
 .../receiver/recovery/ReceiverLogAnalyzer.java     |   4 +-
 .../db/newsync/sender/conf/.SenderConf.java.swp    | Bin 12288 -> 0 bytes
 .../iotdb/db/newsync/sender/conf/SenderConf.java   |  63 ---
 .../db/newsync/sender/pipe/IoTDBPipeSink.java      |   6 +-
 .../iotdb/db/newsync/sender/pipe/TsFilePipe.java   |  19 +-
 .../newsync/sender/recovery/SenderLogAnalyzer.java |   7 +-
 .../db/newsync/sender/recovery/SenderLogger.java   |  11 +-
 .../sender/recovery/TsFilePipeLogAnalyzer.java     |  20 +-
 .../{TsFilePipeLog.java => TsFilePipeLogger.java}  |  75 ++-
 .../db/newsync/sender/service/SenderService.java   |   5 +-
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   2 +
 .../iotdb/db/qp/physical/sys/CreatePipePlan.java   |  20 +-
 .../db/qp/physical/sys/CreatePipeSinkPlan.java     |  18 +-
 .../pipedata/BufferedPipeDataQueueTest.java        | 542 +++++++++++++++++++++
 .../iotdb/db/newsync/pipedata/PipeDataTest.java    |   4 +
 26 files changed, 1210 insertions(+), 210 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
index 1b13d13..64af3c7 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
@@ -22,14 +22,14 @@ import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.newsync.conf.BufferedPipeDataBlockingQueue;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
 import org.apache.iotdb.db.newsync.pipedata.DeletionPipeData;
 import org.apache.iotdb.db.newsync.pipedata.PipeData;
 import org.apache.iotdb.db.newsync.pipedata.SchemaPipeData;
 import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
 import org.apache.iotdb.db.newsync.receiver.collector.Collector;
-import org.apache.iotdb.db.newsync.utils.BufferedPipeDataBlockingQueue;
-import org.apache.iotdb.db.newsync.utils.SyncConstant;
-import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/utils/BufferedPipeDataBlockingQueue.java b/server/src/main/java/org/apache/iotdb/db/newsync/conf/BufferedPipeDataBlockingQueue.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/newsync/utils/BufferedPipeDataBlockingQueue.java
rename to server/src/main/java/org/apache/iotdb/db/newsync/conf/BufferedPipeDataBlockingQueue.java
index 537bc4e..dae6932 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/utils/BufferedPipeDataBlockingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/conf/BufferedPipeDataBlockingQueue.java
@@ -17,7 +17,7 @@
  * under the License.
  *
  */
-package org.apache.iotdb.db.newsync.utils;
+package org.apache.iotdb.db.newsync.conf;
 
 import org.apache.iotdb.db.newsync.pipedata.PipeData;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java b/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
new file mode 100644
index 0000000..730c382
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.conf;
+
+public class SyncConstant {
+  /** sender */
+  public static final String DEFAULT_PIPE_SINK_IP = "127.0.0.1";
+
+  public static final int DEFAULT_PIPE_SINK_PORT = 6670;
+
+  public static final String SENDER_PIPE_DIR_NAME = "sender";
+  public static final String FINISH_COLLECT_LOCK_NAME = "finishCollect.lock";
+  public static final Long DEFAULT_WAITING_FOR_TSFILE_CLOSE_MILLISECONDS = 500L;
+  public static final Long DEFAULT_WAITING_FOR_TSFILE_RETRY_NUMBER = 10L;
+  public static final Long DEFAULT_WAITING_FOR_DEREGISTER_MILLISECONDS = 100L;
+  public static final String MODS_OFFSET_FILE_SUFFIX = ".offset";
+  public static final String PIPE_LOG_NAME_SUFFIX = "-pipe.log";
+  public static final Long DEFAULT_PIPE_LOG_SIZE_IN_BYTE = 10485760L;
+  public static final String HISTORY_PIPE_LOG_NAME = PIPE_LOG_NAME_SUFFIX + ".history";
+  public static final String COMMIT_LOG_NAME = "commit.log";
+
+  public static final String SENDER_LOG_NAME = "senderService.log";
+  public static final String PLAN_SERIALIZE_SPLIT_CHARACTER = ",";
+  public static final String SENDER_LOG_SPLIT_CHARACTER = " ";
+
+  public static String getPipeLogName(long serialNumber) {
+    return serialNumber + PIPE_LOG_NAME_SUFFIX;
+  }
+
+  public static Long getSerialNumberFromPipeLogName(String pipeLogName) {
+    return Long.parseLong(pipeLogName.split("-")[0]);
+  }
+
+  /** receiver */
+  public static final String SYNC_SYS_DIR = "sys";
+
+  public static final String RECEIVER_DIR = "receiver";
+  public static final String RECEIVER_LOG_NAME = "receiverService.log";
+  public static final String PIPE_LOG_DIR_NAME = "pipe-log";
+  public static final String FILE_DATA_DIR_NAME = "file-data";
+  public static final String COLLECTOR_SUFFIX = ".collector";
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncPathUtil.java b/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncPathUtil.java
similarity index 81%
rename from server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncPathUtil.java
rename to server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncPathUtil.java
index f57a080..9d20799 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncPathUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncPathUtil.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.newsync.utils;
+package org.apache.iotdb.db.newsync.conf;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 
@@ -24,16 +24,25 @@ import java.io.File;
 
 /** Util for path generation in sync module */
 public class SyncPathUtil {
+  /** sender */
+  public static String getSenderPipeDir(String pipeName, long createTime) {
+    return IoTDBDescriptor.getInstance().getConfig().getNewSyncDir()
+        + File.separator
+        + SyncConstant.SENDER_PIPE_DIR_NAME
+        + String.format("-%s-%d", pipeName, createTime);
+  }
+
+  /** receiver */
   public static String getReceiverPipeLogDir(String pipeName, String remoteIp, long createTime) {
     return getReceiverPipeDir(pipeName, remoteIp, createTime)
         + File.separator
-        + SyncConstant.PIPELOG_DIR_NAME;
+        + SyncConstant.PIPE_LOG_DIR_NAME;
   }
 
   public static String getReceiverFileDataDir(String pipeName, String remoteIp, long createTime) {
     return getReceiverPipeDir(pipeName, remoteIp, createTime)
         + File.separator
-        + SyncConstant.FILEDATA_DIR_NAME;
+        + SyncConstant.FILE_DATA_DIR_NAME;
   }
 
   public static String getReceiverPipeDir(String pipeName, String remoteIp, long createTime) {
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java
index 2e16115..c8d9389 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.newsync.receiver.load.ILoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -32,7 +34,7 @@ import java.io.IOException;
 public abstract class PipeData {
   private static final Logger logger = LoggerFactory.getLogger(PipeData.class);
 
-  protected final long serialNumber;
+  protected long serialNumber;
 
   public PipeData(long serialNumber) {
     this.serialNumber = serialNumber;
@@ -42,17 +44,9 @@ public abstract class PipeData {
     return serialNumber;
   }
 
-  //    abstract public Loader.Type getLoaderType() {
-  //      if (tsFilePath != null) {
-  //        return Loader.Type.TsFile;
-  //      } else if (deletion != null) {
-  //        return Loader.Type.Deletion;
-  //      } else if (plan != null) {
-  //        return Loader.Type.PhysicalPlan;
-  //      }
-  //      logger.error("Wrong type for transport type.");
-  //      return null;
-  //    }
+  public void setSerialNumber(long serialNumber) {
+    this.serialNumber = serialNumber;
+  }
 
   public abstract Type getType();
 
@@ -65,6 +59,12 @@ public abstract class PipeData {
     return serializeSize;
   }
 
+  public byte[] serialize() throws IOException {
+    ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+    serialize(new DataOutputStream(byteStream));
+    return byteStream.toByteArray();
+  }
+
   public static PipeData deserialize(DataInputStream stream)
       throws IOException, IllegalPathException {
     Type type = Type.values()[stream.readByte()];
@@ -82,6 +82,10 @@ public abstract class PipeData {
     }
   }
 
+  public static PipeData deserialize(byte[] bytes) throws IllegalPathException, IOException {
+    return deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
+  }
+
   public abstract ILoader createLoader();
 
   public abstract void sendToTransport();
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java
index 3ff101f..aec255d 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.db.newsync.pipedata;
 
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
 import org.apache.iotdb.db.newsync.receiver.load.ILoader;
 import org.apache.iotdb.db.newsync.receiver.load.TsFileLoader;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
@@ -48,6 +48,10 @@ public class TsFilePipeData extends PipeData {
     this.tsFilePath = tsFilePath;
   }
 
+  public void setTsFilePath(String tsFilePath) {
+    this.tsFilePath = tsFilePath;
+  }
+
   @Override
   public Type getType() {
     return Type.TSFILE;
@@ -77,20 +81,39 @@ public class TsFilePipeData extends PipeData {
     }
   }
 
+  public List<File> getTsFiles() throws FileNotFoundException {
+    File tsFile = new File(tsFilePath).getAbsoluteFile();
+    File resource = new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+    File mods = new File(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX);
+
+    List<File> files = new ArrayList<>();
+    if (!tsFile.exists()) {
+      throw new FileNotFoundException(String.format("Can not find %s.", tsFile.getAbsolutePath()));
+    }
+    files.add(tsFile);
+    if (resource.exists()) {
+      files.add(resource);
+    }
+    if (mods.exists()) {
+      files.add(mods);
+    }
+    return files;
+  }
+
   private boolean waitForTsFileClose() {
-    for (int i = 0; i < SenderConf.defaultWaitingForTsFileRetryNumber; i++) {
+    for (int i = 0; i < SyncConstant.DEFAULT_WAITING_FOR_TSFILE_RETRY_NUMBER; i++) {
       if (isTsFileClosed()) {
         return true;
       }
       try {
-        Thread.sleep(SenderConf.defaultWaitingForTsFileCloseMilliseconds);
+        Thread.sleep(SyncConstant.DEFAULT_WAITING_FOR_TSFILE_CLOSE_MILLISECONDS);
       } catch (InterruptedException e) {
         logger.warn(String.format("Be Interrupted when waiting for tsfile %s closed", tsFilePath));
       }
       logger.info(
           String.format(
               "Waiting for tsfile %s close, retry %d / %d.",
-              tsFilePath, (i + 1), SenderConf.defaultWaitingForTsFileRetryNumber));
+              tsFilePath, (i + 1), SyncConstant.DEFAULT_WAITING_FOR_TSFILE_RETRY_NUMBER));
     }
     return false;
   }
@@ -101,25 +124,6 @@ public class TsFilePipeData extends PipeData {
     return resource.exists();
   }
 
-  public List<File> getTsFiles() throws FileNotFoundException {
-    File tsFile = new File(tsFilePath).getAbsoluteFile();
-    File resource = new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
-    File mods = new File(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX);
-
-    List<File> files = new ArrayList<>();
-    if (!tsFile.exists()) {
-      throw new FileNotFoundException(String.format("Can not find %s.", tsFile.getAbsolutePath()));
-    }
-    files.add(tsFile);
-    if (resource.exists()) {
-      files.add(resource);
-    }
-    if (mods.exists()) {
-      files.add(mods);
-    }
-    return files;
-  }
-
   @Override
   public String toString() {
     return "TsFilePipeData{"
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/queue/BufferedPipeDataQueue.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/queue/BufferedPipeDataQueue.java
new file mode 100644
index 0000000..62020fd
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/queue/BufferedPipeDataQueue.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.pipedata.queue;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
+import org.apache.iotdb.db.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+
+public class BufferedPipeDataQueue implements PipeDataQueue {
+  private static final Logger logger = LoggerFactory.getLogger(BufferedPipeDataQueue.class);
+
+  private final String pipeLogDir;
+
+  /** input */
+  private long lastMaxSerialNumber;
+
+  private BlockingDeque<PipeData> inputDeque;
+
+  private BlockingDeque<Long> pipeLogStartNumber;
+  private DataOutputStream outputStream;
+  private long currentPipeLogSize;
+
+  /** output */
+  private final Object waitLock = new Object();
+
+  private BlockingDeque<PipeData> outputDeque;
+
+  private long pullSerialNumber;
+  private long commitSerialNumber;
+  private DataOutputStream commitLogWriter;
+  private long currentCommitLogSize;
+
+  public BufferedPipeDataQueue(String pipeLogDir) {
+    this.pipeLogDir = pipeLogDir;
+
+    this.lastMaxSerialNumber = 0;
+    this.pipeLogStartNumber = new LinkedBlockingDeque<>();
+
+    this.outputDeque = new LinkedBlockingDeque<>();
+    this.commitSerialNumber = Long.MIN_VALUE;
+
+    recover();
+  }
+
+  /** recover */
+  private void recover() {
+    if (!new File(pipeLogDir).exists()) {
+      return;
+    }
+
+    recoverPipeLogStartNumber();
+    recoverLastMaxSerialNumber();
+    recoverCommitSerialNumber();
+    recoverOutputDeque();
+  }
+
+  private void recoverPipeLogStartNumber() {
+    File logDir = new File(pipeLogDir);
+    List<Long> startNumbers = new ArrayList<>();
+
+    for (File file : logDir.listFiles())
+      if (file.getName().endsWith(SyncConstant.PIPE_LOG_NAME_SUFFIX)) {
+        startNumbers.add(SyncConstant.getSerialNumberFromPipeLogName(file.getName()));
+      }
+    if (startNumbers.size() != 0) {
+      Collections.sort(startNumbers);
+      for (Long startTime : startNumbers) {
+        pipeLogStartNumber.offer(startTime);
+      }
+    }
+  }
+
+  private void recoverLastMaxSerialNumber() {
+    if (pipeLogStartNumber.isEmpty()) {
+      return;
+    }
+
+    File writingPipeLog =
+        new File(pipeLogDir, SyncConstant.getPipeLogName(pipeLogStartNumber.peekLast()));
+    try {
+      List<PipeData> recoverPipeData = parsePipeLog(writingPipeLog);
+      int recoverPipeDataSize = recoverPipeData.size();
+      lastMaxSerialNumber =
+          recoverPipeDataSize == 0
+              ? pipeLogStartNumber.peekLast() - 1
+              : recoverPipeData.get(recoverPipeDataSize - 1).getSerialNumber();
+    } catch (IOException e) {
+      logger.error(
+          String.format(
+              "Can not recover inputQueue from %s, because %s.", writingPipeLog.getPath(), e));
+    }
+  }
+
+  private void recoverCommitSerialNumber() {
+    File commitLog = new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME);
+    if (!commitLog.exists()) {
+      if (!pipeLogStartNumber.isEmpty()) {
+        commitSerialNumber = pipeLogStartNumber.peek() - 1;
+      }
+      return;
+    }
+
+    try (RandomAccessFile raf = new RandomAccessFile(commitLog, "r")) {
+      if (raf.length() >= Long.BYTES) {
+        raf.seek(raf.length() - Long.BYTES);
+        commitSerialNumber = raf.readLong();
+      }
+    } catch (IOException e) {
+      logger.error(
+          String.format(
+              "deserialize remove serial number error, remove serial number has been set to %d, because %s",
+              commitSerialNumber, e));
+    }
+  }
+
+  private void recoverOutputDeque() {
+    if (pipeLogStartNumber.isEmpty()) {
+      return;
+    }
+
+    File readingPipeLog =
+        new File(pipeLogDir, SyncConstant.getPipeLogName(pipeLogStartNumber.peek()));
+    try {
+      List<PipeData> recoverPipeData = parsePipeLog(readingPipeLog);
+      int recoverPipeDataSize = recoverPipeData.size();
+      for (int i = recoverPipeDataSize - 1; i >= 0; --i) {
+        PipeData pipeData = recoverPipeData.get(i);
+        if (pipeData.getSerialNumber() <= commitSerialNumber) {
+          break;
+        }
+        outputDeque.addFirst(pipeData);
+      }
+    } catch (IOException e) {
+      logger.error(
+          String.format(
+              "Recover output deque from pipe log %s error, because %s.",
+              readingPipeLog.getPath(), e));
+    }
+  }
+
+  public long getLastMaxSerialNumber() {
+    return lastMaxSerialNumber;
+  }
+
+  public long getCommitSerialNumber() {
+    return commitSerialNumber;
+  }
+
+  /** input */
+  @Override
+  public boolean offer(PipeData pipeData) {
+    if (outputStream == null || currentPipeLogSize > SyncConstant.DEFAULT_PIPE_LOG_SIZE_IN_BYTE) {
+      try {
+        moveToNextPipeLog(pipeData.getSerialNumber());
+      } catch (IOException e) {
+        logger.warn(String.format("Move to next pipe log %s error, because %s.", pipeData, e));
+      }
+    }
+    if (!inputDeque.offer(pipeData)) {
+      return false;
+    }
+    synchronized (waitLock) {
+      waitLock.notifyAll();
+    }
+
+    try {
+      writeToDisk(pipeData);
+    } catch (IOException e) {
+      logger.error(String.format("Record pipe data %s error, because %s.", pipeData, e));
+      return false;
+    }
+    return true;
+  }
+
+  private synchronized void moveToNextPipeLog(long startSerialNumber) throws IOException {
+    if (outputStream != null) {
+      outputStream.close();
+    }
+    File newPipeLog = new File(pipeLogDir, SyncConstant.getPipeLogName(startSerialNumber));
+    createFile(newPipeLog);
+
+    outputStream = new DataOutputStream(new FileOutputStream(newPipeLog));
+    pipeLogStartNumber.offer(startSerialNumber);
+    currentPipeLogSize = 0;
+
+    inputDeque = new LinkedBlockingDeque<>();
+    if (commitSerialNumber == Long.MIN_VALUE) {
+      commitSerialNumber = startSerialNumber - 1;
+    }
+  }
+
+  private void writeToDisk(PipeData pipeData) throws IOException {
+    // skip trick
+
+    currentPipeLogSize += pipeData.serialize(outputStream);
+    outputStream.flush();
+  }
+
+  /** output */
+  private synchronized PipeData pullOnePipeData(long lastSerialNumber) throws IOException {
+    long serialNumber = lastSerialNumber + 1;
+    if (!outputDeque.isEmpty()) {
+      return outputDeque.poll();
+    } else if (outputDeque != inputDeque) {
+      if (pipeLogStartNumber.isEmpty()) {
+        return null;
+      }
+
+      if (serialNumber > pipeLogStartNumber.peekLast()) {
+        return null;
+      } else if (serialNumber == pipeLogStartNumber.peekLast() && inputDeque != null) {
+        outputDeque = inputDeque;
+      } else {
+        List<PipeData> parsePipeData =
+            parsePipeLog(new File(pipeLogDir, SyncConstant.getPipeLogName(serialNumber)));
+        int parsePipeDataSize = parsePipeData.size();
+        outputDeque = new LinkedBlockingDeque<>();
+        for (int i = 0; i < parsePipeDataSize; i++) {
+          outputDeque.offer(parsePipeData.get(i));
+        }
+      }
+      return outputDeque.poll();
+    }
+    return null;
+  }
+
+  @Override
+  public List<PipeData> pull(long serialNumber) {
+    List<PipeData> resPipeData = new ArrayList<>();
+
+    pullSerialNumber = commitSerialNumber;
+    while (pullSerialNumber < serialNumber) {
+      try {
+        PipeData pullPipeData = pullOnePipeData(pullSerialNumber);
+        if (pullPipeData != null) {
+          resPipeData.add(pullPipeData);
+          pullSerialNumber = pullPipeData.getSerialNumber();
+        } else {
+          break;
+        }
+      } catch (IOException e) {
+        logger.error(
+            String.format(
+                "Pull pipe data serial number %s error, because %s.", pullSerialNumber + 1, e));
+        break;
+      }
+    }
+
+    for (int i = resPipeData.size() - 1; i >= 0; --i) {
+      outputDeque.addFirst(resPipeData.get(i));
+    }
+    return resPipeData;
+  }
+
+  @Override
+  public PipeData take() throws InterruptedException {
+    PipeData pipeData = null;
+    try {
+      synchronized (waitLock) {
+        pipeData = pullOnePipeData(commitSerialNumber);
+        if (pipeData == null) {
+          waitLock.wait();
+          waitLock.notifyAll();
+          pipeData = pullOnePipeData(commitSerialNumber);
+        }
+      }
+    } catch (IOException e) {
+      logger.error(
+          String.format(
+              "Blocking pull pipe data number %s error, because %s", commitSerialNumber + 1, e));
+    }
+    outputDeque.addFirst(pipeData);
+    pullSerialNumber = pipeData.getSerialNumber();
+    return pipeData;
+  }
+
+  @Override
+  public void commit() {
+    deletePipeData();
+    deletePipeLog();
+    serializeCommitSerialNumber();
+  }
+
+  private void deletePipeData() {
+    while (commitSerialNumber < pullSerialNumber) {
+      commitSerialNumber += 1;
+      try {
+        PipeData commitData = pullOnePipeData(commitSerialNumber);
+        if (PipeData.Type.TSFILE.equals(commitData.getType())) {
+          List<File> tsFiles = ((TsFilePipeData) commitData).getTsFiles();
+          for (File file : tsFiles) {
+            Files.deleteIfExists(file.toPath());
+          }
+        }
+      } catch (IOException e) {
+        logger.error(
+            String.format(
+                "Commit pipe data serial number %s error, because %s.", commitSerialNumber, e));
+      }
+    }
+  }
+
+  private void deletePipeLog() {
+    if (pipeLogStartNumber.size() >= 2) {
+      long nowPipeLogStartNumber;
+      while (true) {
+        nowPipeLogStartNumber = pipeLogStartNumber.poll();
+        if (!pipeLogStartNumber.isEmpty() && pipeLogStartNumber.peek() <= commitSerialNumber) {
+          try {
+            Files.deleteIfExists(
+                new File(pipeLogDir, SyncConstant.getPipeLogName(nowPipeLogStartNumber)).toPath());
+          } catch (IOException e) {
+            logger.warn(
+                String.format("Delete %s-pipe.log error, because %s.", nowPipeLogStartNumber, e));
+          }
+        } else {
+          break;
+        }
+      }
+      pipeLogStartNumber.addFirst(nowPipeLogStartNumber);
+    }
+  }
+
+  private void serializeCommitSerialNumber() {
+    try {
+      if (commitLogWriter == null) {
+        commitLogWriter =
+            new DataOutputStream(
+                new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME)));
+        currentCommitLogSize = 0;
+      }
+      commitLogWriter.writeLong(commitSerialNumber);
+      commitLogWriter.flush();
+      currentCommitLogSize += Long.BYTES;
+      if (currentCommitLogSize >= SyncConstant.DEFAULT_PIPE_LOG_SIZE_IN_BYTE) {
+        commitLogWriter.close();
+        commitLogWriter = null;
+      }
+    } catch (IOException e) {
+      logger.error(
+          String.format(
+              "Serialize commit serial number %s error, because %s.", commitSerialNumber, e));
+    }
+  }
+
+  /** common */
+  @Override
+  public void clear() {
+    try {
+      if (outputStream != null) {
+        outputStream.close();
+        outputStream = null;
+      }
+      if (commitLogWriter != null) {
+        commitLogWriter.close();
+        commitLogWriter = null;
+      }
+
+      inputDeque = null;
+      pipeLogStartNumber = null;
+      outputDeque = null;
+      File logDir = new File(pipeLogDir);
+      if (logDir.exists()) {
+        FileUtils.deleteDirectory(logDir);
+      }
+    } catch (IOException e) {
+      logger.warn(String.format("Clear pipe log dir %s error, because %s.", pipeLogDir, e));
+    }
+  }
+
+  private boolean createFile(File file) throws IOException {
+    if (!file.getParentFile().exists()) {
+      file.getParentFile().mkdirs();
+    }
+    return file.createNewFile();
+  }
+
+  public static List<PipeData> parsePipeLog(File file) throws IOException {
+    List<PipeData> pipeData = new ArrayList<>();
+    try (DataInputStream inputStream = new DataInputStream(new FileInputStream(file))) {
+      while (true) {
+        pipeData.add(PipeData.deserialize(inputStream));
+      }
+    } catch (EOFException e) {
+      logger.info(String.format("Finish parsing pipeLog %s.", file.getPath()));
+    } catch (IllegalPathException e) {
+      logger.error(String.format("Parsing pipeLog %s error, because %s", file.getPath(), e));
+      throw new IOException(e);
+    }
+    return pipeData;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncConstant.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/queue/PipeDataQueue.java
similarity index 61%
rename from server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncConstant.java
rename to server/src/main/java/org/apache/iotdb/db/newsync/pipedata/queue/PipeDataQueue.java
index ecf0bd3..6adac03 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/queue/PipeDataQueue.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,13 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.newsync.utils;
+package org.apache.iotdb.db.newsync.pipedata.queue;
 
-public class SyncConstant {
-  public static final String SYNC_SYS_DIR = "sys";
-  public static final String RECEIVER_DIR = "receiver";
-  public static final String RECEIVER_LOG_NAME = "receiverService.log";
-  public static final String PIPELOG_DIR_NAME = "pipe-log";
-  public static final String FILEDATA_DIR_NAME = "file-data";
-  public static final String COLLECTOR_SUFFIX = ".collector";
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+
+import java.util.List;
+
+public interface PipeDataQueue {
+  boolean offer(PipeData data);
+
+  List<PipeData> pull(long serialNumber);
+
+  PipeData take() throws InterruptedException;
+
+  void commit();
+
+  void clear();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
index 7729815..4ded585 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
@@ -20,11 +20,11 @@ package org.apache.iotdb.db.newsync.receiver;
 
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
 import org.apache.iotdb.db.newsync.receiver.collector.Collector;
 import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
 import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
 import org.apache.iotdb.db.newsync.receiver.manager.ReceiverManager;
-import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
 import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
 import org.apache.iotdb.db.qp.utils.DatetimeUtils;
 import org.apache.iotdb.db.query.dataset.ListDataSet;
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
index 9321f5c..c278b74 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
@@ -22,10 +22,10 @@ package org.apache.iotdb.db.newsync.receiver.collector;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.conf.BufferedPipeDataBlockingQueue;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
 import org.apache.iotdb.db.newsync.pipedata.PipeData;
-import org.apache.iotdb.db.newsync.utils.BufferedPipeDataBlockingQueue;
-import org.apache.iotdb.db.newsync.utils.SyncConstant;
-import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
index c0dc7b4..7222919 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
@@ -18,9 +18,9 @@
  */
 package org.apache.iotdb.db.newsync.receiver.recovery;
 
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
 import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
-import org.apache.iotdb.db.newsync.utils.SyncConstant;
-import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
 
 import java.io.BufferedWriter;
 import java.io.File;
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
index 5ffe24c..bbf55a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
@@ -19,10 +19,10 @@
 package org.apache.iotdb.db.newsync.receiver.recovery;
 
 import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
 import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
 import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
-import org.apache.iotdb.db.newsync.utils.SyncConstant;
-import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
 import org.apache.iotdb.db.service.ServiceType;
 
 import org.slf4j.Logger;
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/conf/.SenderConf.java.swp b/server/src/main/java/org/apache/iotdb/db/newsync/sender/conf/.SenderConf.java.swp
deleted file mode 100644
index 18a3081..0000000
Binary files a/server/src/main/java/org/apache/iotdb/db/newsync/sender/conf/.SenderConf.java.swp and /dev/null differ
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/conf/SenderConf.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/conf/SenderConf.java
deleted file mode 100644
index 813f259..0000000
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/conf/SenderConf.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.newsync.sender.conf;
-
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
-
-import java.io.File;
-
-public class SenderConf {
-  public static final String defaultPipeSinkIP = "127.0.0.1";
-  public static final int defaultPipeSinkPort = 6670;
-
-  public static final String syncBaseDir =
-      IoTDBDescriptor.getInstance().getConfig().getNewSyncDir();
-
-  public static final String pipeDir = syncBaseDir + File.separator + "sender_pipe ";
-  public static final String pipeCollectFinishLockName = "finishCollect.lock";
-  public static final Long defaultWaitingForTsFileCloseMilliseconds = 500L;
-  public static final Long defaultWaitingForTsFileRetryNumber = 10L;
-  public static final Long defaultWaitingForDeregisterMilliseconds = 100L;
-  public static final String tsFileDirName = "TsFile_data";
-  public static final String modsOffsetFileSuffix = ".offset";
-  public static final String pipeLogDirName = "Pipe_log";
-  public static final String historyPipeLogName = "pipe_data.log.history";
-  public static final String realTimePipeLogNameSuffix = "-pipe_data.log";
-  public static final Long defaultPipeLogSizeInByte = 10485760L;
-  public static final String removeSerialNumberLogName = "remove_serial_number.log";
-
-  public static final String sysDir = syncBaseDir + File.separator + "sys";
-  public static final String senderLog = sysDir + File.separator + "senderService.log";
-  public static final String planSplitCharacter = ",";
-  public static final String senderLogSplitCharacter = " ";
-
-  public static String getPipeDir(Pipe pipe) {
-    return pipeDir + "_" + pipe.getName() + "_" + pipe.getCreateTime();
-  }
-
-  public static String getRealTimePipeLogName(long serialNumber) {
-    return serialNumber + realTimePipeLogNameSuffix;
-  }
-
-  public static Long getSerialNumberFromPipeLogName(String pipeLogName) {
-    return Long.parseLong(pipeLogName.split("-")[0]);
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/IoTDBPipeSink.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/IoTDBPipeSink.java
index d4ea459..5180919 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/IoTDBPipeSink.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/IoTDBPipeSink.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.newsync.sender.pipe;
 
 import org.apache.iotdb.db.exception.PipeSinkException;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class IoTDBPipeSink implements PipeSink {
@@ -31,8 +31,8 @@ public class IoTDBPipeSink implements PipeSink {
   private int port;
 
   public IoTDBPipeSink(String name) {
-    ip = SenderConf.defaultPipeSinkIP;
-    port = SenderConf.defaultPipeSinkPort;
+    ip = SyncConstant.DEFAULT_PIPE_SINK_IP;
+    port = SyncConstant.DEFAULT_PIPE_SINK_PORT;
     this.name = name;
     type = Type.IoTDB;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipe.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipe.java
index 99e6d44..6fd1fb9 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipe.java
@@ -27,13 +27,14 @@ import org.apache.iotdb.db.engine.storagegroup.virtualSg.StorageGroupManager;
 import org.apache.iotdb.db.exception.PipeException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
 import org.apache.iotdb.db.newsync.pipedata.DeletionPipeData;
 import org.apache.iotdb.db.newsync.pipedata.PipeData;
 import org.apache.iotdb.db.newsync.pipedata.SchemaPipeData;
 import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
-import org.apache.iotdb.db.newsync.sender.recovery.TsFilePipeLog;
 import org.apache.iotdb.db.newsync.sender.recovery.TsFilePipeLogAnalyzer;
+import org.apache.iotdb.db.newsync.sender.recovery.TsFilePipeLogger;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.service.IoTDB;
@@ -64,7 +65,7 @@ public class TsFilePipe implements Pipe {
   private final boolean syncDelOp;
 
   private final ExecutorService singleExecutorService;
-  private final TsFilePipeLog pipeLog;
+  private final TsFilePipeLogger pipeLog;
   private final ReentrantLock collectRealTimeDataLock;
 
   private BlockingDeque<PipeData> pipeDataDeque;
@@ -81,7 +82,7 @@ public class TsFilePipe implements Pipe {
     this.dataStartTime = dataStartTime;
     this.syncDelOp = syncDelOp;
 
-    this.pipeLog = new TsFilePipeLog(this);
+    this.pipeLog = new TsFilePipeLogger(this);
     this.singleExecutorService =
         IoTDBThreadPoolFactory.newSingleThreadExecutor(
             ThreadName.PIPE_SERVICE.getName() + "-" + name);
@@ -106,7 +107,7 @@ public class TsFilePipe implements Pipe {
   public synchronized void start() throws PipeException {
     if (status == PipeStatus.DROP) {
       throw new PipeException(
-          String.format("Can not start pipe %s, because the pipe is drop.", name));
+          String.format("Can not start pipe %s, because the pipe has been drop.", name));
     } else if (status == PipeStatus.RUNNING) {
       return;
     }
@@ -128,7 +129,9 @@ public class TsFilePipe implements Pipe {
       status = PipeStatus.RUNNING;
     } catch (IOException e) {
       logger.error(
-          String.format("Clear pipe dir %s error, because %s.", SenderConf.getPipeDir(this), e));
+          String.format(
+              "Clear pipe dir %s error, because %s.",
+              SyncPathUtil.getSenderPipeDir(name, createTime), e));
       throw new PipeException("Start error, can not clear pipe log.");
     }
   }
@@ -384,7 +387,7 @@ public class TsFilePipe implements Pipe {
     while (!pipeDataDeque.isEmpty() && pipeDataDeque.peek().getSerialNumber() <= serialNumber) {
       PipeData data = pipeDataDeque.poll();
       try {
-        pipeLog.removePipeData(pipeDataDeque.poll().getSerialNumber());
+        pipeLog.removePipeData(data);
       } catch (IOException e) {
         logger.warn(
             String.format(
@@ -432,7 +435,7 @@ public class TsFilePipe implements Pipe {
     singleExecutorService.shutdown();
 
     try {
-      Thread.sleep(SenderConf.defaultWaitingForDeregisterMilliseconds);
+      Thread.sleep(SyncConstant.DEFAULT_WAITING_FOR_DEREGISTER_MILLISECONDS);
     } catch (InterruptedException e) {
       logger.warn(
           String.format(
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogAnalyzer.java
index 8f2364d..2504145 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogAnalyzer.java
@@ -20,7 +20,8 @@
 package org.apache.iotdb.db.newsync.sender.recovery;
 
 import org.apache.iotdb.db.exception.PipeException;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
 import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
 import org.apache.iotdb.db.newsync.sender.pipe.PipeSink;
 import org.apache.iotdb.db.newsync.sender.service.SenderService;
@@ -48,7 +49,7 @@ public class SenderLogAnalyzer {
   private Pipe.PipeStatus runningPipeStatus;
 
   public SenderLogAnalyzer() throws IOException {
-    senderLog = new File(SenderConf.senderLog);
+    senderLog = new File(SyncPathUtil.getSysDir(), SyncConstant.SENDER_LOG_NAME);
     if (!senderLog.exists()) {
       senderLog.createNewFile();
     }
@@ -67,7 +68,7 @@ public class SenderLogAnalyzer {
     try {
       while ((readLine = br.readLine()) != null) {
         lineNumber += 1;
-        parseStrings = readLine.split(SenderConf.senderLogSplitCharacter);
+        parseStrings = readLine.split(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
         Operator.OperatorType type = Operator.OperatorType.valueOf(parseStrings[0]);
 
         switch (type) {
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogger.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogger.java
index 7f18cda..9afba24 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogger.java
@@ -19,7 +19,8 @@
  */
 package org.apache.iotdb.db.newsync.sender.recovery;
 
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
@@ -45,7 +46,7 @@ public class SenderLogger {
         return;
       }
 
-      File senderLog = new File(SenderConf.senderLog);
+      File senderLog = new File(SyncPathUtil.getSysDir(), SyncConstant.SENDER_LOG_NAME);
       if (!senderLog.exists()) {
         if (!senderLog.getParentFile().exists()) {
           senderLog.getParentFile().mkdirs();
@@ -76,7 +77,7 @@ public class SenderLogger {
     getBufferedWriter();
     try {
       bw.write(Operator.OperatorType.DROP_PIPESINK.name());
-      bw.write(SenderConf.senderLogSplitCharacter);
+      bw.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
       bw.write(pipeSinkName);
       bw.newLine();
       bw.flush();
@@ -89,7 +90,7 @@ public class SenderLogger {
     getBufferedWriter();
     try {
       bw.write(Operator.OperatorType.CREATE_PIPE.name());
-      bw.write(SenderConf.senderLogSplitCharacter);
+      bw.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
       bw.write(String.valueOf(pipeCreateTime));
       bw.newLine();
       bw.write(plan.toString());
@@ -104,7 +105,7 @@ public class SenderLogger {
     getBufferedWriter();
     try {
       bw.write(type.name());
-      bw.write(SenderConf.senderLogSplitCharacter);
+      bw.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
       bw.write(pipeName);
       bw.newLine();
       bw.flush();
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
index 13750f1..f0ac0ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
@@ -20,8 +20,9 @@
 package org.apache.iotdb.db.newsync.sender.recovery;
 
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
 import org.apache.iotdb.db.newsync.pipedata.PipeData;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
 import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipe;
 
 import org.slf4j.Logger;
@@ -51,8 +52,8 @@ public class TsFilePipeLogAnalyzer {
   private long removeSerialNumber;
 
   public TsFilePipeLogAnalyzer(TsFilePipe pipe) {
-    pipeDir = SenderConf.getPipeDir(pipe);
-    pipeLogDir = new File(pipeDir, SenderConf.pipeLogDirName).getPath();
+    pipeDir = SyncPathUtil.getSenderPipeDir(pipe.getName(), pipe.getCreateTime());
+    pipeLogDir = new File(pipeDir, SyncConstant.PIPE_LOG_DIR_NAME).getPath();
   }
 
   public BlockingDeque<PipeData> recover() {
@@ -71,7 +72,7 @@ public class TsFilePipeLogAnalyzer {
   }
 
   private void deserializeRemoveSerialNumber() {
-    File removeSerialNumberLog = new File(pipeLogDir, SenderConf.removeSerialNumberLogName);
+    File removeSerialNumberLog = new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME);
     if (!removeSerialNumberLog.exists()) {
       return;
     }
@@ -98,7 +99,7 @@ public class TsFilePipeLogAnalyzer {
   }
 
   private void recoverHistoryData() {
-    File historyPipeLog = new File(pipeLogDir, SenderConf.historyPipeLogName);
+    File historyPipeLog = new File(pipeLogDir, SyncConstant.HISTORY_PIPE_LOG_NAME);
     if (!historyPipeLog.exists()) {
       return;
     }
@@ -134,14 +135,13 @@ public class TsFilePipeLogAnalyzer {
 
     List<Long> startNumbers = new ArrayList<>();
     for (File file : pipeLogDir.listFiles())
-      if (file.getName().endsWith(SenderConf.realTimePipeLogNameSuffix)) {
-        startNumbers.add(SenderConf.getSerialNumberFromPipeLogName(file.getName()));
+      if (file.getName().endsWith(SyncConstant.PIPE_LOG_NAME_SUFFIX)) {
+        startNumbers.add(SyncConstant.getSerialNumberFromPipeLogName(file.getName()));
       }
     if (startNumbers.size() != 0) {
       Collections.sort(startNumbers);
       for (Long startNumber : startNumbers) {
-        File realTimePipeLog =
-            new File(this.pipeLogDir, SenderConf.getRealTimePipeLogName(startNumber));
+        File realTimePipeLog = new File(this.pipeLogDir, SyncConstant.getPipeLogName(startNumber));
         try {
           List<PipeData> realTimeData = parseFile(realTimePipeLog);
           for (PipeData data : realTimeData)
@@ -158,7 +158,7 @@ public class TsFilePipeLogAnalyzer {
   }
 
   public boolean isCollectFinished() {
-    return new File(pipeDir, SenderConf.pipeCollectFinishLockName).exists();
+    return new File(pipeDir, SyncConstant.FINISH_COLLECT_LOCK_NAME).exists();
   }
 
   public static List<PipeData> parseFile(File file) throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLog.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogger.java
similarity index 81%
rename from server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLog.java
rename to server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogger.java
index 68a09d2..4169e8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLog.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogger.java
@@ -22,9 +22,10 @@ package org.apache.iotdb.db.newsync.sender.recovery;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
 import org.apache.iotdb.db.newsync.pipedata.PipeData;
 import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
 import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipe;
 import org.apache.iotdb.db.utils.FileUtils;
 
@@ -47,8 +48,8 @@ import java.util.List;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 
-public class TsFilePipeLog {
-  private static final Logger logger = LoggerFactory.getLogger(TsFilePipeLog.class);
+public class TsFilePipeLogger {
+  private static final Logger logger = LoggerFactory.getLogger(TsFilePipeLogger.class);
 
   private final String pipeDir;
   private final String tsFileDir;
@@ -63,10 +64,10 @@ public class TsFilePipeLog {
   private BufferedWriter removeSerialNumberWriter;
   private long currentRemoveLogSize;
 
-  public TsFilePipeLog(TsFilePipe tsFilePipe) {
-    pipeDir = SenderConf.getPipeDir(tsFilePipe);
-    tsFileDir = new File(pipeDir, SenderConf.tsFileDirName).getPath();
-    pipeLogDir = new File(pipeDir, SenderConf.pipeLogDirName).getPath();
+  public TsFilePipeLogger(TsFilePipe tsFilePipe) {
+    pipeDir = SyncPathUtil.getSenderPipeDir(tsFilePipe.getName(), tsFilePipe.getCreateTime());
+    tsFileDir = new File(pipeDir, SyncConstant.FILE_DATA_DIR_NAME).getPath();
+    pipeLogDir = new File(pipeDir, SyncConstant.PIPE_LOG_DIR_NAME).getPath();
   }
 
   /** make hard link for tsfile * */
@@ -77,7 +78,7 @@ public class TsFilePipeLog {
       File modsHardLink = createHardLink(mods);
       if (modsOffset != 0L) {
         serializeModsOffset(
-            new File(modsHardLink.getPath() + SenderConf.modsOffsetFileSuffix), modsOffset);
+            new File(modsHardLink.getPath() + SyncConstant.MODS_OFFSET_FILE_SUFFIX), modsOffset);
       }
     } else if (modsOffset != 0L) {
       logger.warn(
@@ -147,6 +148,7 @@ public class TsFilePipeLog {
   public void addHistoryPipeData(PipeData pipeData) throws IOException {
     getHistoryOutputStream();
     pipeData.serialize(historyOutputStream);
+    historyOutputStream.flush();
   }
 
   private void getHistoryOutputStream() throws IOException {
@@ -157,7 +159,7 @@ public class TsFilePipeLog {
     // recover history pipe log
     File logDir = new File(pipeLogDir);
     logDir.mkdirs();
-    File historyPipeLog = new File(pipeLogDir, SenderConf.historyPipeLogName);
+    File historyPipeLog = new File(pipeLogDir, SyncConstant.HISTORY_PIPE_LOG_NAME);
     createFile(historyPipeLog);
     historyOutputStream = new DataOutputStream(new FileOutputStream(historyPipeLog, true));
   }
@@ -165,6 +167,7 @@ public class TsFilePipeLog {
   public synchronized void addRealTimePipeData(PipeData pipeData) throws IOException {
     getRealTimeOutputStream(pipeData.getSerialNumber());
     currentPipeLogSize += pipeData.serialize(realTimeOutputStream);
+    realTimeOutputStream.flush();
   }
 
   private void getRealTimeOutputStream(long serialNumber) throws IOException {
@@ -176,8 +179,7 @@ public class TsFilePipeLog {
       if (!realTimePipeLogStartNumber.isEmpty()) {
         File writingPipeLog =
             new File(
-                pipeLogDir,
-                SenderConf.getRealTimePipeLogName(realTimePipeLogStartNumber.peekLast()));
+                pipeLogDir, SyncConstant.getPipeLogName(realTimePipeLogStartNumber.peekLast()));
         realTimeOutputStream = new DataOutputStream(new FileOutputStream(writingPipeLog, true));
         currentPipeLogSize = writingPipeLog.length();
       } else {
@@ -185,7 +187,7 @@ public class TsFilePipeLog {
       }
     }
 
-    if (currentPipeLogSize > SenderConf.defaultPipeLogSizeInByte) {
+    if (currentPipeLogSize > SyncConstant.DEFAULT_PIPE_LOG_SIZE_IN_BYTE) {
       moveToNextPipeLog(serialNumber);
     }
   }
@@ -197,8 +199,8 @@ public class TsFilePipeLog {
 
     logDir.mkdirs();
     for (File file : logDir.listFiles())
-      if (file.getName().endsWith(SenderConf.realTimePipeLogNameSuffix)) {
-        startNumbers.add(SenderConf.getSerialNumberFromPipeLogName(file.getName()));
+      if (file.getName().endsWith(SyncConstant.PIPE_LOG_NAME_SUFFIX)) {
+        startNumbers.add(SyncConstant.getSerialNumberFromPipeLogName(file.getName()));
       }
     if (startNumbers.size() != 0) {
       Collections.sort(startNumbers);
@@ -212,7 +214,7 @@ public class TsFilePipeLog {
     if (realTimeOutputStream != null) {
       realTimeOutputStream.close();
     }
-    File newPipeLog = new File(pipeLogDir, SenderConf.getRealTimePipeLogName(startSerialNumber));
+    File newPipeLog = new File(pipeLogDir, SyncConstant.getPipeLogName(startSerialNumber));
     createFile(newPipeLog);
 
     realTimeOutputStream = new DataOutputStream(new FileOutputStream(newPipeLog));
@@ -221,9 +223,19 @@ public class TsFilePipeLog {
   }
 
   /** remove pipe log data */
-  public void removePipeData(long serialNumber) throws IOException {
+  public void removePipeData(PipeData pipeData) throws IOException {
+    long serialNumber = pipeData.getSerialNumber();
     serializeRemoveSerialNumber(serialNumber);
 
+    // delete tsfile
+    if (PipeData.Type.TSFILE.equals(pipeData.getType())) {
+      List<File> tsFiles = ((TsFilePipeData) pipeData).getTsFiles();
+      for (File file : tsFiles) {
+        Files.deleteIfExists(file.toPath());
+      }
+    }
+
+    // delete pipe log
     if (serialNumber >= 0) {
       if (historyOutputStream != null) {
         removeHistoryPipeLog();
@@ -250,7 +262,7 @@ public class TsFilePipeLog {
   private void removeHistoryPipeLog() throws IOException {
     historyOutputStream.close();
     historyOutputStream = null;
-    File historyPipeLog = new File(pipeLogDir, SenderConf.historyPipeLogName);
+    File historyPipeLog = new File(pipeLogDir, SyncConstant.HISTORY_PIPE_LOG_NAME);
     try {
       Files.delete(historyPipeLog.toPath());
     } catch (NoSuchFileException e) {
@@ -260,8 +272,7 @@ public class TsFilePipeLog {
   }
 
   private void removeRealTimePipeLog(long serialNumber) throws IOException {
-    File realTimePipeLog = new File(pipeLogDir, SenderConf.getRealTimePipeLogName(serialNumber));
-    removeTsFile(realTimePipeLog);
+    File realTimePipeLog = new File(pipeLogDir, SyncConstant.getPipeLogName(serialNumber));
     try {
       Files.delete(realTimePipeLog.toPath());
     } catch (NoSuchFileException e) {
@@ -270,37 +281,17 @@ public class TsFilePipeLog {
     }
   }
 
-  private void removeTsFile(File realTimePipeLog) {
-    try {
-      List<PipeData> pipeData = TsFilePipeLogAnalyzer.parseFile(realTimePipeLog);
-      List<File> tsFiles;
-      for (PipeData data : pipeData)
-        if (PipeData.Type.TSFILE.equals(data.getType())) {
-          tsFiles = ((TsFilePipeData) data).getTsFiles();
-          for (File file : tsFiles) {
-            Files.deleteIfExists(file.toPath());
-          }
-        }
-    } catch (IOException e) {
-      logger.warn(
-          String.format(
-              "Can not parse pipe log %s, the tsfiles in this pipe log will not be deleted, because %s",
-              realTimePipeLog.getPath(), e));
-    }
-  }
-
   private void serializeRemoveSerialNumber(long serialNumber) throws IOException {
     if (removeSerialNumberWriter == null) {
       removeSerialNumberWriter =
-          new BufferedWriter(
-              new FileWriter(new File(pipeLogDir, SenderConf.removeSerialNumberLogName)));
+          new BufferedWriter(new FileWriter(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME)));
       currentRemoveLogSize = 0;
     }
     removeSerialNumberWriter.write(String.valueOf(serialNumber));
     removeSerialNumberWriter.newLine();
     removeSerialNumberWriter.flush();
     currentRemoveLogSize += Long.BYTES;
-    if (currentRemoveLogSize >= SenderConf.defaultPipeLogSizeInByte) {
+    if (currentRemoveLogSize >= SyncConstant.DEFAULT_PIPE_LOG_SIZE_IN_BYTE) {
       removeSerialNumberWriter.close();
       removeSerialNumberWriter = null;
     }
@@ -308,7 +299,7 @@ public class TsFilePipeLog {
 
   public void finishCollect() {
     try {
-      if (createFile(new File(pipeDir, SenderConf.pipeCollectFinishLockName))) {
+      if (createFile(new File(pipeDir, SyncConstant.FINISH_COLLECT_LOCK_NAME))) {
         logger.info(String.format("Create finish collecting Lock file in %s.", pipeDir));
       }
     } catch (IOException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java
index 2caa321..fec7995 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java
@@ -22,7 +22,8 @@ package org.apache.iotdb.db.newsync.sender.service;
 import org.apache.iotdb.db.exception.PipeException;
 import org.apache.iotdb.db.exception.PipeSinkException;
 import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
 import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
 import org.apache.iotdb.db.newsync.sender.pipe.PipeSink;
 import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipe;
@@ -227,7 +228,7 @@ public class SenderService implements IService {
   /** IService * */
   @Override
   public void start() throws StartupException {
-    File senderLog = new File(SenderConf.senderLog);
+    File senderLog = new File(SyncPathUtil.getSysDir(), SyncConstant.SENDER_LOG_NAME);
     if (senderLog.exists()) {
       try {
         recover();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 2a82e6e..4aa910e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -193,7 +193,9 @@ public abstract class Operator {
     PRUNE_TEMPLATE,
     APPEND_TEMPLATE,
     DROP_TEMPLATE,
+
     SHOW_QUERY_RESOURCE,
+
     CREATE_PIPESINK,
     DROP_PIPESINK,
     SHOW_PIPESINK,
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipePlan.java
index a6ca08b..97b4996 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipePlan.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.qp.physical.sys;
 
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -74,7 +74,7 @@ public class CreatePipePlan extends PhysicalPlan {
   }
 
   public static CreatePipePlan parseString(String parsedString) throws IOException {
-    String[] attributes = parsedString.split(SenderConf.planSplitCharacter);
+    String[] attributes = parsedString.split(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
     if (attributes.length < 4) {
       throw new IOException("Parsing CreatePipePlan error. Attributes is less than expected.");
     }
@@ -93,13 +93,17 @@ public class CreatePipePlan extends PhysicalPlan {
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
-    builder.append(pipeName).append(SenderConf.planSplitCharacter);
-    builder.append(pipeSinkName).append(SenderConf.planSplitCharacter);
-    builder.append(dataStartTimestamp).append(SenderConf.planSplitCharacter);
-    builder.append(pipeAttributes.size()).append(SenderConf.planSplitCharacter);
+    builder.append(pipeName).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+    builder.append(pipeSinkName).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+    builder.append(dataStartTimestamp).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+    builder.append(pipeAttributes.size()).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
     for (int i = 0; i < pipeAttributes.size(); i++) {
-      builder.append(pipeAttributes.get(i).left).append(SenderConf.planSplitCharacter);
-      builder.append(pipeAttributes.get(i).right).append(SenderConf.planSplitCharacter);
+      builder
+          .append(pipeAttributes.get(i).left)
+          .append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+      builder
+          .append(pipeAttributes.get(i).right)
+          .append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
     }
     return builder.toString();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipeSinkPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipeSinkPlan.java
index ca11f01..5321845 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipeSinkPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipeSinkPlan.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.qp.physical.sys;
 
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -64,7 +64,7 @@ public class CreatePipeSinkPlan extends PhysicalPlan {
   }
 
   public static CreatePipeSinkPlan parseString(String parsedString) throws IOException {
-    String[] attributes = parsedString.split(SenderConf.planSplitCharacter);
+    String[] attributes = parsedString.split(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
     if (attributes.length < 3) {
       throw new IOException("Parsing CreatePipeSinkPlan error. Attributes is less than expected.");
     }
@@ -82,12 +82,16 @@ public class CreatePipeSinkPlan extends PhysicalPlan {
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
-    builder.append(pipeSinkName).append(SenderConf.planSplitCharacter);
-    builder.append(pipeSinkType).append(SenderConf.planSplitCharacter);
-    builder.append(pipeSinkAttributes.size()).append(SenderConf.planSplitCharacter);
+    builder.append(pipeSinkName).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+    builder.append(pipeSinkType).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+    builder.append(pipeSinkAttributes.size()).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
     for (int i = 0; i < pipeSinkAttributes.size(); i++) {
-      builder.append(pipeSinkAttributes.get(i).left).append(SenderConf.planSplitCharacter);
-      builder.append(pipeSinkAttributes.get(i).right).append(SenderConf.planSplitCharacter);
+      builder
+          .append(pipeSinkAttributes.get(i).left)
+          .append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+      builder
+          .append(pipeSinkAttributes.get(i).right)
+          .append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
     }
     return builder.toString();
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/BufferedPipeDataQueueTest.java b/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/BufferedPipeDataQueueTest.java
new file mode 100644
index 0000000..33afae8
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/BufferedPipeDataQueueTest.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.pipedata;
+
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
+import org.apache.iotdb.db.newsync.pipedata.queue.BufferedPipeDataQueue;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+public class BufferedPipeDataQueueTest {
+  File pipeLogDir =
+      new File(
+          SyncPathUtil.getReceiverPipeLogDir("pipe", "192.168.0.11", System.currentTimeMillis()));
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+    if (!pipeLogDir.exists()) {
+      pipeLogDir.mkdirs();
+    }
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    FileUtils.deleteDirectory(pipeLogDir);
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testRecoveryAndClear() {
+    try {
+      DataOutputStream outputStream =
+          new DataOutputStream(
+              new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+      outputStream.writeLong(1);
+      outputStream.close();
+      // pipelog1: 0~3
+      DataOutputStream pipeLogOutput1 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+      for (int i = 0; i < 4; i++) {
+        new TsFilePipeData(null, i).serialize(pipeLogOutput1);
+      }
+      pipeLogOutput1.close();
+      // pipelog2: 4~10
+      DataOutputStream pipeLogOutput2 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+      for (int i = 4; i < 11; i++) {
+        new TsFilePipeData(null, i).serialize(pipeLogOutput2);
+      }
+      pipeLogOutput2.close();
+      // pipelog3: 11 without pipedata
+      DataOutputStream pipeLogOutput3 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(11)), false));
+      pipeLogOutput3.close();
+      // recovery
+      BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+      pipeDataQueue.clear();
+      Assert.assertFalse(pipeLogDir.exists());
+    } catch (Exception e) {
+      Assert.fail();
+    }
+  }
+
+  /** Try to take data from a new pipe. Expect to wait indefinitely if no data offer. */
+  // TODO: 抛出NPE
+  @Test
+  public void testTake() {
+    BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+    List<PipeData> pipeDatas = new ArrayList<>();
+    ExecutorService es1 = Executors.newSingleThreadExecutor();
+    es1.execute(
+        () -> {
+          try {
+            pipeDatas.add(pipeDataQueue.take());
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        });
+    try {
+      Thread.sleep(3000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    es1.shutdownNow();
+
+    Assert.assertEquals(0, pipeDatas.size());
+  }
+
+  /** Try to take data from a new pipe. Expect to wake after offer. */
+  @Test
+  public void testTakeAndOffer() {
+    BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+    List<PipeData> pipeDatas = new ArrayList<>();
+    ExecutorService es1 = Executors.newSingleThreadExecutor();
+    es1.execute(
+        () -> {
+          try {
+            pipeDatas.add(pipeDataQueue.take());
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        });
+    pipeDataQueue.offer(new TsFilePipeData(null, 0));
+    try {
+      Thread.sleep(3000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    es1.shutdownNow();
+    try {
+      Thread.sleep(500);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+    Assert.assertEquals(1, pipeDatas.size());
+    pipeDataQueue.clear();
+  }
+
+  /** Try to offer data to a new pipe. */
+  @Test
+  public void testOfferNewPipe() {
+    BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+    PipeData pipeData = new TsFilePipeData("fakePath", 1);
+    pipeDataQueue.offer(pipeData);
+    List<PipeData> pipeDatas = new ArrayList<>();
+    ExecutorService es1 = Executors.newSingleThreadExecutor();
+    es1.execute(
+        () -> {
+          try {
+            pipeDatas.add(pipeDataQueue.take());
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        });
+    try {
+      Thread.sleep(3000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    es1.shutdownNow();
+    try {
+      Thread.sleep(500);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+    Assert.assertEquals(1, pipeDatas.size());
+    Assert.assertEquals(pipeData, pipeDatas.get(0));
+    pipeDataQueue.clear();
+  }
+
+  /**
+   * Step1: recover pipeDataQueue (with an empty latest pipelog) Step2: offer new pipeData Step3:
+   * check result
+   */
+  @Test
+  public void testOfferAfterRecoveryWithEmptyPipeLog() {
+    try {
+      DataOutputStream outputStream =
+          new DataOutputStream(
+              new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+      outputStream.writeLong(1);
+      outputStream.close();
+      List<PipeData> pipeDataList = new ArrayList<>();
+      // pipelog1: 0~3
+      DataOutputStream pipeLogOutput1 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+      for (int i = 0; i < 4; i++) {
+        PipeData pipeData = new TsFilePipeData("fake" + i, i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput1);
+      }
+      pipeLogOutput1.close();
+      // pipelog2: 4~10
+      DataOutputStream pipeLogOutput2 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+      for (int i = 4; i < 8; i++) {
+        PipeData pipeData =
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      for (int i = 8; i < 11; i++) {
+        PipeData pipeData =
+            new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake" + i)), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      pipeLogOutput2.close();
+      // pipelog3: 11 without pipedata
+      DataOutputStream pipeLogOutput3 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(11)), false));
+      pipeLogOutput3.close();
+      // recovery
+      BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+      PipeData offerPipeData = new TsFilePipeData(null, 11);
+      pipeDataList.add(offerPipeData);
+      pipeDataQueue.offer(offerPipeData);
+
+      // take and check
+      List<PipeData> pipeDataTakeList = new ArrayList<>();
+      ExecutorService es1 = Executors.newSingleThreadExecutor();
+      es1.execute(
+          () -> {
+            while (true) {
+              try {
+                pipeDataTakeList.add(pipeDataQueue.take());
+                pipeDataQueue.commit();
+              } catch (InterruptedException e) {
+                break;
+              }
+            }
+          });
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      es1.shutdownNow();
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+      Assert.assertEquals(10, pipeDataTakeList.size());
+      for (int i = 0; i < 10; i++) {
+        Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+      }
+      pipeDataQueue.clear();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+  }
+
+  /** Step1: recover pipeDataQueue (without empty latest pipelog) Step2: check result */
+  @Test
+  public void testRecoveryWithEmptyPipeLog() {
+    try {
+      DataOutputStream outputStream =
+          new DataOutputStream(
+              new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+      outputStream.writeLong(1);
+      outputStream.close();
+      List<PipeData> pipeDataList = new ArrayList<>();
+      // pipelog1: 0~3
+      DataOutputStream pipeLogOutput1 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+      for (int i = 0; i < 4; i++) {
+        PipeData pipeData = new TsFilePipeData("fake" + i, i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput1);
+      }
+      pipeLogOutput1.close();
+      // pipelog2: 4~10
+      DataOutputStream pipeLogOutput2 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+      for (int i = 4; i < 8; i++) {
+        PipeData pipeData =
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      for (int i = 8; i < 11; i++) {
+        PipeData pipeData =
+            new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake" + i)), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      pipeLogOutput2.close();
+      // pipelog3: 11 without pipedata
+      DataOutputStream pipeLogOutput3 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(11)), false));
+      pipeLogOutput3.close();
+      // recovery
+      BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+
+      // take and check
+      List<PipeData> pipeDataTakeList = new ArrayList<>();
+      ExecutorService es1 = Executors.newSingleThreadExecutor();
+      es1.execute(
+          () -> {
+            while (true) {
+              try {
+                pipeDataTakeList.add(pipeDataQueue.take());
+                pipeDataQueue.commit();
+              } catch (InterruptedException e) {
+                break;
+              }
+            }
+          });
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      es1.shutdownNow();
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+      Assert.assertEquals(9, pipeDataTakeList.size());
+      for (int i = 0; i < 9; i++) {
+        Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+      }
+      pipeDataQueue.clear();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+  }
+
+  /** Step1: recover pipeDataQueue (without empty latest pipelog) Step2: check result */
+  @Test
+  public void testRecoveryWithoutEmptyPipeLog() {
+    try {
+      DataOutputStream outputStream =
+          new DataOutputStream(
+              new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+      outputStream.writeLong(1);
+      outputStream.close();
+      List<PipeData> pipeDataList = new ArrayList<>();
+      // pipelog1: 0~3
+      DataOutputStream pipeLogOutput1 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+      for (int i = 0; i < 4; i++) {
+        PipeData pipeData = new TsFilePipeData("fake" + i, i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput1);
+      }
+      pipeLogOutput1.close();
+      // pipelog2: 4~10
+      DataOutputStream pipeLogOutput2 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+      for (int i = 4; i < 8; i++) {
+        PipeData pipeData =
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      for (int i = 8; i < 11; i++) {
+        PipeData pipeData =
+            new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake" + i)), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      pipeLogOutput2.close();
+      ;
+      // recovery
+      BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+
+      // take and check
+      List<PipeData> pipeDataTakeList = new ArrayList<>();
+      ExecutorService es1 = Executors.newSingleThreadExecutor();
+      es1.execute(
+          () -> {
+            while (true) {
+              try {
+                pipeDataTakeList.add(pipeDataQueue.take());
+                pipeDataQueue.commit();
+              } catch (InterruptedException e) {
+                break;
+              }
+            }
+          });
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      es1.shutdownNow();
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+      Assert.assertEquals(9, pipeDataTakeList.size());
+      for (int i = 0; i < 9; i++) {
+        Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+      }
+      pipeDataQueue.clear();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+  }
+
+  @Test
+  public void testOfferWhileTaking() {
+    try {
+      DataOutputStream outputStream =
+          new DataOutputStream(
+              new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+      outputStream.writeLong(1);
+      outputStream.close();
+      List<PipeData> pipeDataList = new ArrayList<>();
+      // pipelog1: 0~3
+      DataOutputStream pipeLogOutput1 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+      for (int i = 0; i < 4; i++) {
+        PipeData pipeData = new TsFilePipeData("fake" + i, i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput1);
+      }
+      pipeLogOutput1.close();
+      // pipelog2: 4~10
+      DataOutputStream pipeLogOutput2 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+      for (int i = 4; i < 8; i++) {
+        PipeData pipeData =
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      for (int i = 8; i < 11; i++) {
+        PipeData pipeData =
+            new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake" + i)), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      pipeLogOutput2.close();
+      ;
+      // recovery
+      BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+
+      // take
+      List<PipeData> pipeDataTakeList = new ArrayList<>();
+      ExecutorService es1 = Executors.newSingleThreadExecutor();
+      es1.execute(
+          () -> {
+            while (true) {
+              try {
+                pipeDataTakeList.add(pipeDataQueue.take());
+                pipeDataQueue.commit();
+              } catch (InterruptedException e) {
+                break;
+              }
+            }
+          });
+      // offer
+      for (int i = 11; i < 20; i++) {
+        pipeDataQueue.offer(
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 0), i));
+      }
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      es1.shutdownNow();
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+      Assert.assertEquals(18, pipeDataTakeList.size());
+      for (int i = 0; i < 9; i++) {
+        Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+      }
+      pipeDataQueue.clear();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/PipeDataTest.java b/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/PipeDataTest.java
index ed9d79f..ce65549 100644
--- a/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/PipeDataTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/PipeDataTest.java
@@ -74,6 +74,10 @@ public class PipeDataTest {
       Assert.assertEquals(pipeData3, PipeData.deserialize(inputStream));
       inputStream.close();
       outputStream.close();
+
+      Assert.assertEquals(pipeData1, PipeData.deserialize(pipeData1.serialize()));
+      Assert.assertEquals(pipeData2, PipeData.deserialize(pipeData2.serialize()));
+      Assert.assertEquals(pipeData3, PipeData.deserialize(pipeData3.serialize()));
     } catch (Exception e) {
       logger.error(e.getMessage());
       Assert.fail();