You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/02/21 08:42:52 UTC

[iotdb] branch new_sync updated: [To new_sync][IOTDB-2272] implement customized sync process: collector (#4989)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch new_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/new_sync by this push:
     new ac9901c  [To new_sync][IOTDB-2272] implement customized sync process: collector (#4989)
ac9901c is described below

commit ac9901c2c97a6596e3224494ff8ee4c0b05ef18d
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Mon Feb 21 16:42:12 2022 +0800

    [To new_sync][IOTDB-2272] implement customized sync process: collector (#4989)
---
 .../sync/IoTDBSyncReceiverCollectorIT.java         | 485 +++++++++++++++++++++
 .../sync/IoTDBSyncReceiverLoaderIT.java            | 105 +----
 .../sync/{WriteUtil.java => SyncTestUtil.java}     | 107 ++++-
 .../org/apache/iotdb/db/concurrent/ThreadName.java |   1 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  13 +
 .../db/newsync/pipedata/DeletionPipeData.java      |  36 +-
 .../apache/iotdb/db/newsync/pipedata/PipeData.java |  33 +-
 .../iotdb/db/newsync/pipedata/SchemaPipeData.java  |  39 +-
 .../iotdb/db/newsync/pipedata/TsFilePipeData.java  |  35 +-
 .../iotdb/db/newsync/receiver/ReceiverService.java |  44 +-
 .../db/newsync/receiver/collector/Collector.java   | 188 +++++++-
 .../newsync/receiver/manager/ReceiverManager.java  |  29 +-
 .../db/newsync/receiver/recovery/ReceiverLog.java  |  20 +-
 .../receiver/recovery/ReceiverLogAnalyzer.java     |   6 +-
 .../sender/recovery/TsFilePipeLogAnalyzer.java     |   2 +
 .../utils/BufferedPipeDataBlockingQueue.java       |  79 ++++
 .../db/newsync/{conf => utils}/SyncConstant.java   |   7 +-
 .../iotdb/db/newsync/utils/SyncPathUtil.java       |  56 +++
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   2 -
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   2 +
 .../iotdb/db/newsync/pipedata/PipeDataTest.java    |  82 ++++
 21 files changed, 1182 insertions(+), 189 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
new file mode 100644
index 0000000..1b13d13
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.sync;
+
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.newsync.pipedata.DeletionPipeData;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+import org.apache.iotdb.db.newsync.pipedata.SchemaPipeData;
+import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
+import org.apache.iotdb.db.newsync.receiver.collector.Collector;
+import org.apache.iotdb.db.newsync.utils.BufferedPipeDataBlockingQueue;
+import org.apache.iotdb.db.newsync.utils.SyncConstant;
+import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+@Category({LocalStandaloneTest.class})
+public class IoTDBSyncReceiverCollectorIT {
+  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncReceiverLoaderIT.class);
+  protected static boolean enableSeqSpaceCompaction;
+  protected static boolean enableUnseqSpaceCompaction;
+  protected static boolean enableCrossSpaceCompaction;
+  /** create tsfile and move to tmpDir for sync test */
+  File tmpDir = new File("target/synctest");
+
+  String pipeName1 = "pipe1";
+  String remoteIp1 = "192.168.0.11";
+  long createdTime1 = System.currentTimeMillis();
+  File pipeLogDir1 =
+      new File(SyncPathUtil.getReceiverPipeLogDir(pipeName1, remoteIp1, createdTime1));
+  String pipeName2 = "pipe2";
+  String remoteIp2 = "192.168.0.22";
+  long createdTime2 = System.currentTimeMillis();
+  File pipeLogDir2 =
+      new File(SyncPathUtil.getReceiverPipeLogDir(pipeName2, remoteIp2, createdTime2));
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+    enableSeqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+    SyncTestUtil.insertData();
+    EnvironmentUtils.shutdownDaemon();
+    File srcDir = new File(IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0]);
+    FileUtils.moveDirectory(srcDir, tmpDir);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    FileUtils.deleteDirectory(tmpDir);
+    FileUtils.deleteDirectory(pipeLogDir1);
+    FileUtils.deleteDirectory(pipeLogDir2);
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testOnePipe() throws Exception {
+    // 1. restart IoTDB
+    EnvironmentUtils.cleanEnv();
+    EnvironmentUtils.envSetUp();
+
+    // 2. prepare pipelog and .collector to test fail recovery
+    if (!pipeLogDir1.exists()) {
+      pipeLogDir1.mkdirs();
+    }
+    File pipeLog1 = new File(pipeLogDir1.getPath(), String.valueOf(System.currentTimeMillis()));
+    File collectFile1 = new File(pipeLog1.getPath() + SyncConstant.COLLECTOR_SUFFIX);
+    DataOutputStream pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog1, false));
+    DataOutputStream collectOutput =
+        new DataOutputStream(new FileOutputStream(collectFile1, false));
+    int serialNum = 0;
+    for (int i = 0; i < 5; i++) {
+      // skip first 5 pipeData
+      PipeData pipeData = new TsFilePipeData("", serialNum++);
+      pipeData.serialize(pipeLogOutput);
+      collectOutput.writeInt(i);
+    }
+    List<PhysicalPlan> planList = new ArrayList<>();
+    planList.add(new SetStorageGroupPlan(new PartialPath("root.vehicle")));
+    planList.add(
+        new CreateTimeSeriesPlan(
+            new PartialPath("root.vehicle.d0.s0"),
+            new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.RLE)));
+    planList.add(
+        new CreateTimeSeriesPlan(
+            new PartialPath("root.vehicle.d0.s1"),
+            new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN)));
+    planList.add(
+        new CreateTimeSeriesPlan(
+            new PartialPath("root.vehicle.d1.s2"),
+            new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE)));
+    planList.add(
+        new CreateTimeSeriesPlan(
+            new PartialPath("root.vehicle.d1.s3"),
+            new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.PLAIN)));
+    planList.add(new SetStorageGroupPlan(new PartialPath("root.sg1")));
+    planList.add(
+        new CreateAlignedTimeSeriesPlan(
+            new PartialPath("root.sg1.d1"),
+            Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+            Arrays.asList(
+                TSDataType.FLOAT,
+                TSDataType.INT32,
+                TSDataType.INT64,
+                TSDataType.BOOLEAN,
+                TSDataType.TEXT),
+            Arrays.asList(
+                TSEncoding.RLE,
+                TSEncoding.GORILLA,
+                TSEncoding.RLE,
+                TSEncoding.RLE,
+                TSEncoding.PLAIN),
+            Arrays.asList(
+                CompressionType.SNAPPY,
+                CompressionType.SNAPPY,
+                CompressionType.SNAPPY,
+                CompressionType.SNAPPY,
+                CompressionType.SNAPPY),
+            null));
+    for (PhysicalPlan plan : planList) {
+      PipeData pipeData = new SchemaPipeData(plan, serialNum++);
+      pipeData.serialize(pipeLogOutput);
+    }
+    pipeLogOutput.close();
+    collectOutput.close();
+    File pipeLog2 = new File(pipeLogDir1.getPath(), String.valueOf(System.currentTimeMillis() + 1));
+    pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog2, false));
+    List<File> tsFiles = SyncTestUtil.getTsFilePaths(tmpDir);
+    for (File f : tsFiles) {
+      PipeData pipeData = new TsFilePipeData(f.getPath(), serialNum++);
+      pipeData.serialize(pipeLogOutput);
+    }
+    Deletion deletion = new Deletion(new PartialPath("root.vehicle.**"), 0, 33, 38);
+    PipeData pipeData = new DeletionPipeData(deletion, serialNum++);
+    pipeData.serialize(pipeLogOutput);
+    pipeLogOutput.close();
+
+    // 3. create and start collector
+    Collector collector = new Collector();
+    collector.startCollect();
+
+    // 4. start collect pipe
+    collector.startPipe(pipeName1, remoteIp1, createdTime1);
+
+    // 5. if all pipeData has been loaded into IoTDB, check result
+    CountDownLatch latch = new CountDownLatch(1);
+    ExecutorService es1 = Executors.newSingleThreadExecutor();
+    es1.execute(
+        () -> {
+          while (true) {
+            File[] files = pipeLogDir1.listFiles();
+            if (files.length == 0) {
+              break;
+            }
+          }
+          latch.countDown();
+        });
+    es1.shutdown();
+    try {
+      latch.await(30, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    String sql1 = "select * from root.vehicle.*";
+    String[] retArray1 =
+        new String[] {
+          "6,120,null,null,null",
+          "9,null,123,null,null",
+          "16,128,null,null,16.0",
+          "18,189,198,true,18.0",
+          "20,null,null,false,null",
+          "29,null,null,true,1205.0",
+          "99,null,1234,null,null"
+        };
+    String[] columnNames1 = {
+      "root.vehicle.d0.s0", "root.vehicle.d0.s1", "root.vehicle.d1.s3", "root.vehicle.d1.s2"
+    };
+    SyncTestUtil.checkResult(sql1, columnNames1, retArray1);
+    // 5.2 check aligned timeseries
+    String sql2 = "select * from root.sg1.d1";
+    String[] retArray2 =
+        new String[] {
+          "1,1.0,1,null,true,aligned_test1",
+          "2,2.0,2,null,null,aligned_test2",
+          "3,3.0,null,null,false,aligned_test3",
+          "4,4.0,4,null,true,aligned_test4",
+          "5,130000.0,130000,130000,false,aligned_unseq_test1",
+          "6,6.0,6,6,true,null",
+          "7,7.0,7,7,false,aligned_test7",
+          "8,8.0,8,8,null,aligned_test8",
+          "9,9.0,9,9,false,aligned_test9",
+        };
+    String[] columnNames2 = {
+      "root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s3", "root.sg1.d1.s4", "root.sg1.d1.s5",
+    };
+    SyncTestUtil.checkResult(sql2, columnNames2, retArray2);
+
+    // 6. stop pipe, check interrupt thread
+    collector.stopPipe(pipeName1, remoteIp1, createdTime1);
+    Thread.sleep(1000);
+    Deletion deletion1 = new Deletion(new PartialPath("root.vehicle.**"), 0, 0, 99);
+    PipeData pipeData1 = new DeletionPipeData(deletion1, serialNum++);
+    BufferedPipeDataBlockingQueue.getMock().offer(pipeData1);
+    Thread.sleep(1000);
+    SyncTestUtil.checkResult(sql1, columnNames1, retArray1);
+
+    // 7. stop collector, check release thread pool
+    collector.stopCollect();
+    Thread.sleep(1000);
+    for (Thread t : Thread.getAllStackTraces().keySet()) {
+      if (t.getName().contains(ThreadName.SYNC_RECEIVER_COLLECTOR.getName())) {
+        Assert.fail();
+      }
+    }
+  }
+
+  @Test
+  public void testMultiplePipe() throws Exception {
+    // 1. restart IoTDB
+    EnvironmentUtils.cleanEnv();
+    EnvironmentUtils.envSetUp();
+
+    // 2. prepare pipelog and .collector to test fail recovery
+    if (!pipeLogDir1.exists()) {
+      pipeLogDir1.mkdirs();
+    }
+    if (!pipeLogDir2.exists()) {
+      pipeLogDir2.mkdirs();
+    }
+    // 2.1 prepare for pipe1
+    File pipeLog1 = new File(pipeLogDir1.getPath(), String.valueOf(System.currentTimeMillis()));
+    File collectFile1 = new File(pipeLog1.getPath() + SyncConstant.COLLECTOR_SUFFIX);
+    DataOutputStream pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog1, false));
+    DataOutputStream collectOutput =
+        new DataOutputStream(new FileOutputStream(collectFile1, false));
+    int serialNum = 0;
+    for (int i = 0; i < 5; i++) {
+      // skip first 5 pipeData
+      PipeData pipeData = new TsFilePipeData("", serialNum++);
+      pipeData.serialize(pipeLogOutput);
+      collectOutput.writeInt(i);
+    }
+    List<PhysicalPlan> planList = new ArrayList<>();
+    planList.add(new SetStorageGroupPlan(new PartialPath("root.vehicle")));
+    planList.add(
+        new CreateTimeSeriesPlan(
+            new PartialPath("root.vehicle.d0.s0"),
+            new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.RLE)));
+    planList.add(
+        new CreateTimeSeriesPlan(
+            new PartialPath("root.vehicle.d0.s1"),
+            new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN)));
+    planList.add(
+        new CreateTimeSeriesPlan(
+            new PartialPath("root.vehicle.d1.s2"),
+            new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE)));
+    planList.add(
+        new CreateTimeSeriesPlan(
+            new PartialPath("root.vehicle.d1.s3"),
+            new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.PLAIN)));
+    for (PhysicalPlan plan : planList) {
+      PipeData pipeData = new SchemaPipeData(plan, serialNum++);
+      pipeData.serialize(pipeLogOutput);
+    }
+    pipeLogOutput.close();
+    collectOutput.close();
+    File pipeLog2 = new File(pipeLogDir1.getPath(), String.valueOf(System.currentTimeMillis() + 1));
+    pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog2, false));
+    List<File> tsFiles =
+        SyncTestUtil.getTsFilePaths(new File(tmpDir, "sequence" + File.separator + "root.vehicle"));
+    for (File f : tsFiles) {
+      PipeData pipeData = new TsFilePipeData(f.getPath(), serialNum++);
+      pipeData.serialize(pipeLogOutput);
+    }
+    tsFiles =
+        SyncTestUtil.getTsFilePaths(
+            new File(tmpDir, "unsequence" + File.separator + "root.vehicle"));
+    for (File f : tsFiles) {
+      PipeData pipeData = new TsFilePipeData(f.getPath(), serialNum++);
+      pipeData.serialize(pipeLogOutput);
+    }
+    Deletion deletion = new Deletion(new PartialPath("root.vehicle.**"), 0, 33, 38);
+    PipeData pipeData = new DeletionPipeData(deletion, serialNum++);
+    pipeData.serialize(pipeLogOutput);
+    pipeLogOutput.close();
+
+    // 2.2 prepare for pipe2
+    serialNum = 0;
+    pipeLog1 = new File(pipeLogDir2.getPath(), String.valueOf(System.currentTimeMillis()));
+    pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog1, false));
+    pipeData =
+        new SchemaPipeData(
+            new CreateAlignedTimeSeriesPlan(
+                new PartialPath("root.sg1.d1"),
+                Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+                Arrays.asList(
+                    TSDataType.FLOAT,
+                    TSDataType.INT32,
+                    TSDataType.INT64,
+                    TSDataType.BOOLEAN,
+                    TSDataType.TEXT),
+                Arrays.asList(
+                    TSEncoding.RLE,
+                    TSEncoding.GORILLA,
+                    TSEncoding.RLE,
+                    TSEncoding.RLE,
+                    TSEncoding.PLAIN),
+                Arrays.asList(
+                    CompressionType.SNAPPY,
+                    CompressionType.SNAPPY,
+                    CompressionType.SNAPPY,
+                    CompressionType.SNAPPY,
+                    CompressionType.SNAPPY),
+                null),
+            serialNum++);
+    pipeData.serialize(pipeLogOutput);
+    pipeData =
+        new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("root.sg1")), serialNum++);
+    pipeData.serialize(pipeLogOutput);
+    pipeLogOutput.close();
+    pipeLog2 = new File(pipeLogDir2.getPath(), String.valueOf(System.currentTimeMillis() + 1));
+    pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog2, false));
+    tsFiles =
+        SyncTestUtil.getTsFilePaths(new File(tmpDir, "sequence" + File.separator + "root.sg1"));
+    for (File f : tsFiles) {
+      pipeData = new TsFilePipeData(f.getPath(), serialNum++);
+      pipeData.serialize(pipeLogOutput);
+    }
+    tsFiles =
+        SyncTestUtil.getTsFilePaths(new File(tmpDir, "unsequence" + File.separator + "root.sg1"));
+    for (File f : tsFiles) {
+      pipeData = new TsFilePipeData(f.getPath(), serialNum++);
+      pipeData.serialize(pipeLogOutput);
+    }
+    pipeLogOutput.close();
+
+    // 3. create and start collector
+    Collector collector = new Collector();
+    collector.startCollect();
+
+    // 4. start collect pipe
+    collector.startPipe(pipeName1, remoteIp1, createdTime1);
+    collector.startPipe(pipeName2, remoteIp2, createdTime2);
+
+    // 5. if all pipeData has been loaded into IoTDB, check result
+    CountDownLatch latch = new CountDownLatch(2);
+    ExecutorService es1 = Executors.newSingleThreadExecutor();
+    es1.execute(
+        () -> {
+          while (true) {
+            File[] files = pipeLogDir1.listFiles();
+            if (files.length == 0) {
+              break;
+            }
+          }
+          latch.countDown();
+          while (true) {
+            File[] files = pipeLogDir2.listFiles();
+            if (files.length == 0) {
+              break;
+            }
+          }
+          latch.countDown();
+        });
+    es1.shutdown();
+    try {
+      latch.await(30, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    String sql1 = "select * from root.vehicle.*";
+    String[] retArray1 =
+        new String[] {
+          "6,120,null,null,null",
+          "9,null,123,null,null",
+          "16,128,null,null,16.0",
+          "18,189,198,true,18.0",
+          "20,null,null,false,null",
+          "29,null,null,true,1205.0",
+          "99,null,1234,null,null"
+        };
+    String[] columnNames1 = {
+      "root.vehicle.d0.s0", "root.vehicle.d0.s1", "root.vehicle.d1.s3", "root.vehicle.d1.s2"
+    };
+    SyncTestUtil.checkResult(sql1, columnNames1, retArray1);
+    // 5.2 check aligned timeseries
+    String sql2 = "select * from root.sg1.d1";
+    String[] retArray2 =
+        new String[] {
+          "1,1.0,1,null,true,aligned_test1",
+          "2,2.0,2,null,null,aligned_test2",
+          "3,3.0,null,null,false,aligned_test3",
+          "4,4.0,4,null,true,aligned_test4",
+          "5,130000.0,130000,130000,false,aligned_unseq_test1",
+          "6,6.0,6,6,true,null",
+          "7,7.0,7,7,false,aligned_test7",
+          "8,8.0,8,8,null,aligned_test8",
+          "9,9.0,9,9,false,aligned_test9",
+        };
+    String[] columnNames2 = {
+      "root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s3", "root.sg1.d1.s4", "root.sg1.d1.s5",
+    };
+    SyncTestUtil.checkResult(sql2, columnNames2, retArray2);
+
+    // 6. stop pipe, check interrupt thread
+    collector.stopPipe(pipeName1, remoteIp1, createdTime1);
+    collector.stopPipe(pipeName2, remoteIp2, createdTime2);
+    Thread.sleep(1000);
+    Deletion deletion1 = new Deletion(new PartialPath("root.vehicle.**"), 0, 0, 99);
+    PipeData pipeData1 = new DeletionPipeData(deletion1, serialNum++);
+    BufferedPipeDataBlockingQueue.getMock().offer(pipeData1);
+    Thread.sleep(1000);
+    SyncTestUtil.checkResult(sql1, columnNames1, retArray1);
+
+    // 7. stop collector, check release thread pool
+    collector.stopCollect();
+    Thread.sleep(1000);
+    for (Thread t : Thread.getAllStackTraces().keySet()) {
+      if (t.getName().contains(ThreadName.SYNC_RECEIVER_COLLECTOR.getName())) {
+        Assert.fail();
+      }
+    }
+  }
+}
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
index 3d96ccb..8b98f92 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
@@ -31,11 +31,9 @@ import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.itbase.category.LocalStandaloneTest;
-import org.apache.iotdb.jdbc.Config;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.utils.FilePathUtils;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.apache.commons.io.FileUtils;
@@ -48,13 +46,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
 import java.sql.*;
 import java.util.*;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
 @Category({LocalStandaloneTest.class})
 public class IoTDBSyncReceiverLoaderIT {
   private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncReceiverLoaderIT.class);
@@ -76,7 +70,7 @@ public class IoTDBSyncReceiverLoaderIT {
     IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
     IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
     IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
-    WriteUtil.insertData();
+    SyncTestUtil.insertData();
     EnvironmentUtils.shutdownDaemon();
     File srcDir = new File(IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0]);
     FileUtils.moveDirectory(srcDir, tmpDir);
@@ -155,7 +149,7 @@ public class IoTDBSyncReceiverLoaderIT {
     }
 
     // 3. test for TsFileLoader
-    List<File> tsFiles = getTsFilePaths(tmpDir);
+    List<File> tsFiles = SyncTestUtil.getTsFilePaths(tmpDir);
     for (File tsfile : tsFiles) {
       ILoader tsFileLoader = new TsFileLoader(tsfile);
       try {
@@ -192,7 +186,7 @@ public class IoTDBSyncReceiverLoaderIT {
     String[] columnNames1 = {
       "root.vehicle.d0.s0", "root.vehicle.d0.s1", "root.vehicle.d1.s3", "root.vehicle.d1.s2"
     };
-    checkResult(sql1, columnNames1, retArray1);
+    SyncTestUtil.checkResult(sql1, columnNames1, retArray1);
     // 5.2 check aligned timeseries
     String sql2 = "select * from root.sg1.d1";
     String[] retArray2 =
@@ -210,97 +204,6 @@ public class IoTDBSyncReceiverLoaderIT {
     String[] columnNames2 = {
       "root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s3", "root.sg1.d1.s4", "root.sg1.d1.s5",
     };
-    checkResult(sql2, columnNames2, retArray2);
-  }
-
-  private void checkResult(String sql, String[] columnNames, String[] retArray)
-      throws ClassNotFoundException {
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    try (Connection connection =
-            DriverManager.getConnection(
-                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
-        Statement statement = connection.createStatement()) {
-      boolean hasResultSet = statement.execute(sql);
-      Assert.assertTrue(hasResultSet);
-      ResultSet resultSet = statement.getResultSet();
-      ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-      Map<String, Integer> map = new HashMap<>();
-      for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
-        map.put(resultSetMetaData.getColumnName(i), i);
-      }
-      assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
-      int cnt = 0;
-      while (resultSet.next()) {
-        StringBuilder builder = new StringBuilder();
-        builder.append(resultSet.getString(1));
-        for (String columnName : columnNames) {
-          int index = map.get(columnName);
-          builder.append(",").append(resultSet.getString(index));
-        }
-        assertEquals(retArray[cnt], builder.toString());
-        cnt++;
-      }
-      assertEquals(retArray.length, cnt);
-    } catch (Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
-  }
-
-  /**
-   * scan parentDir and return all TsFile sorted by load sequence
-   *
-   * @param parentDir folder to scan
-   */
-  private List<File> getTsFilePaths(File parentDir) {
-    List<File> res = new ArrayList<>();
-    if (!parentDir.exists()) {
-      Assert.fail();
-      return res;
-    }
-    scanDir(res, parentDir);
-    Collections.sort(
-        res,
-        new Comparator<File>() {
-          @Override
-          public int compare(File f1, File f2) {
-            int diffSg =
-                f1.getParentFile()
-                    .getParentFile()
-                    .getParentFile()
-                    .getName()
-                    .compareTo(f2.getParentFile().getParentFile().getParentFile().getName());
-            if (diffSg != 0) {
-              return diffSg;
-            } else {
-              return (int)
-                  (FilePathUtils.splitAndGetTsFileVersion(f1.getName())
-                      - FilePathUtils.splitAndGetTsFileVersion(f2.getName()));
-            }
-          }
-        });
-    return res;
-  }
-
-  private void scanDir(List<File> tsFiles, File parentDir) {
-    if (!parentDir.exists()) {
-      Assert.fail();
-      return;
-    }
-    File fa[] = parentDir.listFiles();
-    for (int i = 0; i < fa.length; i++) {
-      File fs = fa[i];
-      if (fs.isDirectory()) {
-        scanDir(tsFiles, fs);
-      } else if (fs.getName().endsWith(".resource")) {
-        // only add tsfile that has been flushed
-        tsFiles.add(new File(fs.getAbsolutePath().substring(0, fs.getAbsolutePath().length() - 9)));
-        try {
-          FileUtils.delete(fs);
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
-      }
-    }
+    SyncTestUtil.checkResult(sql2, columnNames2, retArray2);
   }
 }
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/WriteUtil.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/SyncTestUtil.java
similarity index 61%
rename from integration/src/test/java/org/apache/iotdb/db/integration/sync/WriteUtil.java
rename to integration/src/test/java/org/apache/iotdb/db/integration/sync/SyncTestUtil.java
index 9d4da20..4498c11 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/WriteUtil.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/SyncTestUtil.java
@@ -19,12 +19,20 @@
 package org.apache.iotdb.db.integration.sync;
 
 import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
 
-public class WriteUtil {
+import java.io.File;
+import java.io.IOException;
+import java.sql.*;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class SyncTestUtil {
 
   private static final String[] sqls =
       new String[] {
@@ -96,7 +104,94 @@ public class WriteUtil {
     }
   }
 
-  public static void main(String[] args) throws ClassNotFoundException {
-    insertData();
+  /**
+   * scan parentDir and return all TsFile sorted by load sequence
+   *
+   * @param parentDir folder to scan
+   */
+  public static List<File> getTsFilePaths(File parentDir) {
+    List<File> res = new ArrayList<>();
+    if (!parentDir.exists()) {
+      Assert.fail();
+      return res;
+    }
+    scanDir(res, parentDir);
+    Collections.sort(
+        res,
+        new Comparator<File>() {
+          @Override
+          public int compare(File f1, File f2) {
+            int diffSg =
+                f1.getParentFile()
+                    .getParentFile()
+                    .getParentFile()
+                    .getName()
+                    .compareTo(f2.getParentFile().getParentFile().getParentFile().getName());
+            if (diffSg != 0) {
+              return diffSg;
+            } else {
+              return (int)
+                  (FilePathUtils.splitAndGetTsFileVersion(f1.getName())
+                      - FilePathUtils.splitAndGetTsFileVersion(f2.getName()));
+            }
+          }
+        });
+    return res;
+  }
+
+  private static void scanDir(List<File> tsFiles, File parentDir) {
+    if (!parentDir.exists()) {
+      Assert.fail();
+      return;
+    }
+    File fa[] = parentDir.listFiles();
+    for (int i = 0; i < fa.length; i++) {
+      File fs = fa[i];
+      if (fs.isDirectory()) {
+        scanDir(tsFiles, fs);
+      } else if (fs.getName().endsWith(".resource")) {
+        // only add tsfile that has been flushed
+        tsFiles.add(new File(fs.getAbsolutePath().substring(0, fs.getAbsolutePath().length() - 9)));
+        try {
+          FileUtils.delete(fs);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+
+  public static void checkResult(String sql, String[] columnNames, String[] retArray)
+      throws ClassNotFoundException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet = statement.execute(sql);
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+      Map<String, Integer> map = new HashMap<>();
+      for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+        map.put(resultSetMetaData.getColumnName(i), i);
+      }
+      assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+      int cnt = 0;
+      while (resultSet.next()) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(resultSet.getString(1));
+        for (String columnName : columnNames) {
+          int index = map.get(columnName);
+          builder.append(",").append(resultSet.getString(index));
+        }
+        assertEquals(retArray[cnt], builder.toString());
+        cnt++;
+      }
+      assertEquals(retArray.length, cnt);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 4028571..c407012 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -57,6 +57,7 @@ public enum ThreadName {
   TIMED_CLOSE_TSFILE("Timed-Close-TsFile"),
   SETTLE_SERVICE("Settle"),
   PIPE_SERVICE("Pipe"),
+  SYNC_RECEIVER_COLLECTOR("Sync-Collector"),
   CONTINUOUS_QUERY_SERVICE("ContinuousQueryTaskPoolManager"),
   CLUSTER_INFO_SERVICE("ClusterInfoClient"),
   CLUSTER_RPC_SERVICE("ClusterRPC"),
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index d43afa4..b0307ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -221,6 +221,10 @@ public class IoTDBConfig {
           + File.separator
           + IoTDBConstant.SYNC_FOLDER_NAME;
 
+  /** Sync directory, including the lock file, uuid file, device owner map */
+  // TODO: delete old syncDir and rename newSyncDir to syncDir
+  private String newSyncDir = DEFAULT_BASE_DIR + File.separator + IoTDBConstant.SYNC_FOLDER_NAME;
+
   /** Performance tracing directory, stores performance tracing files */
   private String tracingDir = DEFAULT_BASE_DIR + File.separator + IoTDBConstant.TRACING_FOLDER_NAME;
 
@@ -928,6 +932,7 @@ public class IoTDBConfig {
     systemDir = addHomeDir(systemDir);
     schemaDir = addHomeDir(schemaDir);
     syncDir = addHomeDir(syncDir);
+    newSyncDir = addHomeDir(newSyncDir);
     tracingDir = addHomeDir(tracingDir);
     walDir = addHomeDir(walDir);
     indexRootFolder = addHomeDir(indexRootFolder);
@@ -1110,6 +1115,14 @@ public class IoTDBConfig {
     this.syncDir = syncDir;
   }
 
+  public String getNewSyncDir() {
+    return newSyncDir;
+  }
+
+  void setNewSyncDir(String syncDir) {
+    this.newSyncDir = syncDir;
+  }
+
   public String getTracingDir() {
     return tracingDir;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/DeletionPipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/DeletionPipeData.java
index 99d5638..b4e91c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/DeletionPipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/DeletionPipeData.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.newsync.pipedata;
 
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.receiver.load.DeletionLoader;
+import org.apache.iotdb.db.newsync.receiver.load.ILoader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,16 +31,13 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Objects;
 
 public class DeletionPipeData extends PipeData {
   private static final Logger logger = LoggerFactory.getLogger(DeletionPipeData.class);
 
   private Deletion deletion;
 
-  public DeletionPipeData(long serialNumber) {
-    super(serialNumber);
-  }
-
   public DeletionPipeData(Deletion deletion, long serialNumber) {
     super(serialNumber);
     this.deletion = deletion;
@@ -50,13 +49,20 @@ public class DeletionPipeData extends PipeData {
   }
 
   @Override
-  public long serializeImpl(DataOutputStream stream) throws IOException {
-    return deletion.serializeWithoutFileOffset(stream);
+  public long serialize(DataOutputStream stream) throws IOException {
+    return super.serialize(stream) + deletion.serializeWithoutFileOffset(stream);
+  }
+
+  public static DeletionPipeData deserialize(DataInputStream stream)
+      throws IOException, IllegalPathException {
+    long serialNumber = stream.readLong();
+    Deletion deletion = Deletion.deserializeWithoutFileOffset(stream);
+    return new DeletionPipeData(deletion, serialNumber);
   }
 
   @Override
-  public void deserializeImpl(DataInputStream stream) throws IOException, IllegalPathException {
-    this.deletion = Deletion.deserializeWithoutFileOffset(stream);
+  public ILoader createLoader() {
+    return new DeletionLoader(deletion);
   }
 
   @Override
@@ -79,4 +85,18 @@ public class DeletionPipeData extends PipeData {
   public String toString() {
     return "DeletionData{" + "deletion=" + deletion + ", serialNumber=" + serialNumber + '}';
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    DeletionPipeData that = (DeletionPipeData) o;
+    return Objects.equals(deletion, that.deletion)
+        && Objects.equals(serialNumber, that.serialNumber);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(deletion, serialNumber);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java
index 0575d5c..2e16115 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java
@@ -20,12 +20,17 @@
 package org.apache.iotdb.db.newsync.pipedata;
 
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.receiver.load.ILoader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
 public abstract class PipeData {
+  private static final Logger logger = LoggerFactory.getLogger(PipeData.class);
 
   protected final long serialNumber;
 
@@ -53,39 +58,31 @@ public abstract class PipeData {
 
   public long serialize(DataOutputStream stream) throws IOException {
     long serializeSize = 0;
-    stream.writeLong(serialNumber);
-    serializeSize += Long.BYTES;
     stream.writeByte((byte) getType().ordinal());
     serializeSize += Byte.BYTES;
-    serializeSize += serializeImpl(stream);
+    stream.writeLong(serialNumber);
+    serializeSize += Long.BYTES;
     return serializeSize;
   }
 
-  public abstract long serializeImpl(DataOutputStream stream) throws IOException;
-
   public static PipeData deserialize(DataInputStream stream)
       throws IOException, IllegalPathException {
-    long serialNumber = stream.readLong();
     Type type = Type.values()[stream.readByte()];
-
-    PipeData pipeData = null;
     switch (type) {
       case TSFILE:
-        pipeData = new TsFilePipeData(serialNumber);
-        break;
+        return TsFilePipeData.deserialize(stream);
       case DELETION:
-        pipeData = new DeletionPipeData(serialNumber);
-        break;
+        return DeletionPipeData.deserialize(stream);
       case PHYSICALPLAN:
-        pipeData = new SchemaPipeData(serialNumber);
-        break;
+        return SchemaPipeData.deserialize(stream);
+      default:
+        logger.error("Deserialize PipeData error because Unknown type {}.", type);
+        throw new UnsupportedOperationException(
+            "Deserialize PipeData error because Unknown type " + type);
     }
-    pipeData.deserializeImpl(stream);
-    return pipeData;
   }
 
-  public abstract void deserializeImpl(DataInputStream stream)
-      throws IOException, IllegalPathException;
+  public abstract ILoader createLoader();
 
   public abstract void sendToTransport();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/SchemaPipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/SchemaPipeData.java
index 28f068a..809a0e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/SchemaPipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/SchemaPipeData.java
@@ -20,22 +20,21 @@
 package org.apache.iotdb.db.newsync.pipedata;
 
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.receiver.load.ILoader;
+import org.apache.iotdb.db.newsync.receiver.load.SchemaLoader;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
 public class SchemaPipeData extends PipeData {
   private static final int SERIALIZE_BUFFER_SIZE = 1024;
 
   private PhysicalPlan plan;
 
-  public SchemaPipeData(long serialNumber) {
-    super(serialNumber);
-  }
-
   public SchemaPipeData(PhysicalPlan plan, long serialNumber) {
     super(serialNumber);
     this.plan = plan;
@@ -47,11 +46,13 @@ public class SchemaPipeData extends PipeData {
   }
 
   @Override
-  public long serializeImpl(DataOutputStream stream) throws IOException {
+  public long serialize(DataOutputStream stream) throws IOException {
+    long serializeSize = super.serialize(stream);
     byte[] bytes = getBytes();
     stream.writeInt(bytes.length);
     stream.write(bytes);
-    return Integer.BYTES + bytes.length;
+    serializeSize += (Integer.BYTES + bytes.length);
+    return serializeSize;
   }
 
   private byte[] getBytes() {
@@ -63,11 +64,18 @@ public class SchemaPipeData extends PipeData {
     return bytes;
   }
 
-  @Override
-  public void deserializeImpl(DataInputStream stream) throws IOException, IllegalPathException {
+  public static SchemaPipeData deserialize(DataInputStream stream)
+      throws IOException, IllegalPathException {
+    long serialNumber = stream.readLong();
     byte[] bytes = new byte[stream.readInt()];
     stream.read(bytes);
-    this.plan = PhysicalPlan.Factory.create(ByteBuffer.wrap(bytes));
+    PhysicalPlan plan = PhysicalPlan.Factory.create(ByteBuffer.wrap(bytes));
+    return new SchemaPipeData(plan, serialNumber);
+  }
+
+  @Override
+  public ILoader createLoader() {
+    return new SchemaLoader(plan);
   }
 
   @Override
@@ -80,4 +88,17 @@ public class SchemaPipeData extends PipeData {
   public String toString() {
     return "SchemaPipeData{" + "serialNumber=" + serialNumber + ", plan=" + plan + '}';
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    SchemaPipeData that = (SchemaPipeData) o;
+    return Objects.equals(plan, that.plan) && Objects.equals(serialNumber, that.serialNumber);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(plan, serialNumber);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java
index 503486a..3ff101f 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.newsync.pipedata;
 
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.newsync.receiver.load.ILoader;
+import org.apache.iotdb.db.newsync.receiver.load.TsFileLoader;
 import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -34,16 +36,13 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 public class TsFilePipeData extends PipeData {
   private static final Logger logger = LoggerFactory.getLogger(TsFilePipeData.class);
 
   private String tsFilePath;
 
-  public TsFilePipeData(long serialNumber) {
-    super(serialNumber);
-  }
-
   public TsFilePipeData(String tsFilePath, long serialNumber) {
     super(serialNumber);
     this.tsFilePath = tsFilePath;
@@ -55,13 +54,19 @@ public class TsFilePipeData extends PipeData {
   }
 
   @Override
-  public long serializeImpl(DataOutputStream stream) throws IOException {
-    return ReadWriteIOUtils.write(tsFilePath, stream);
+  public long serialize(DataOutputStream stream) throws IOException {
+    return super.serialize(stream) + ReadWriteIOUtils.write(tsFilePath, stream);
+  }
+
+  public static TsFilePipeData deserialize(DataInputStream stream) throws IOException {
+    long serialNumber = stream.readLong();
+    String tsFilePath = ReadWriteIOUtils.readString(stream);
+    return new TsFilePipeData(tsFilePath, serialNumber);
   }
 
   @Override
-  public void deserializeImpl(DataInputStream stream) throws IOException {
-    this.tsFilePath = ReadWriteIOUtils.readString(stream);
+  public ILoader createLoader() {
+    return new TsFileLoader(new File(tsFilePath));
   }
 
   @Override
@@ -125,4 +130,18 @@ public class TsFilePipeData extends PipeData {
         + '\''
         + '}';
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    TsFilePipeData pipeData = (TsFilePipeData) o;
+    return Objects.equals(tsFilePath, pipeData.tsFilePath)
+        && Objects.equals(serialNumber, pipeData.serialNumber);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(tsFilePath, serialNumber);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
index e85bc7c..3dcca71 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
@@ -22,7 +22,9 @@ import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.newsync.receiver.collector.Collector;
 import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
+import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
 import org.apache.iotdb.db.newsync.receiver.manager.ReceiverManager;
+import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
 import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
 import org.apache.iotdb.db.qp.utils.DatetimeUtils;
 import org.apache.iotdb.db.query.dataset.ListDataSet;
@@ -34,10 +36,12 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Binary;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
@@ -53,7 +57,16 @@ public class ReceiverService implements IService {
   public boolean startPipeServer() {
     try {
       receiverManager.startServer();
-      // TODO: start socket and collector
+      collector.startCollect();
+      // recover started pipe
+      List<PipeInfo> pipeInfos = receiverManager.getAllPipeInfos();
+      for (PipeInfo pipeInfo : pipeInfos) {
+        if (pipeInfo.getStatus().equals(PipeStatus.RUNNING)) {
+          collector.startPipe(
+              pipeInfo.getPipeName(), pipeInfo.getRemoteIp(), pipeInfo.getCreateTime());
+        }
+      }
+      // TODO: start socket
     } catch (IOException e) {
       logger.error(e.getMessage());
       return false;
@@ -65,6 +78,7 @@ public class ReceiverService implements IService {
   public boolean stopPipeServer() {
     try {
       receiverManager.stopServer();
+      collector.stopCollect();
       // TODO: stop socket and collector
     } catch (IOException e) {
       logger.error(e.getMessage());
@@ -74,23 +88,41 @@ public class ReceiverService implements IService {
   }
 
   /** create and start a new pipe named pipeName */
-  public void createPipe(String pipeName, String remoteIp, long startTime) throws IOException {
-    receiverManager.createPipe(pipeName, remoteIp, startTime);
+  public void createPipe(String pipeName, String remoteIp, long createTime) throws IOException {
+    createDir(pipeName, remoteIp, createTime);
+    receiverManager.createPipe(pipeName, remoteIp, createTime);
+    collector.startPipe(pipeName, remoteIp, createTime);
   }
 
   /** start an existed pipe named pipeName */
-  public void startPipe(String pipeName, String remoteIp) throws IOException {
+  public void startPipe(String pipeName, String remoteIp, long createTime) throws IOException {
     receiverManager.startPipe(pipeName, remoteIp);
+    collector.startPipe(pipeName, remoteIp, createTime);
   }
 
   /** stop an existed pipe named pipeName */
-  public void stopPipe(String pipeName, String remoteIp) throws IOException {
+  public void stopPipe(String pipeName, String remoteIp, long createTime) throws IOException {
     receiverManager.stopPipe(pipeName, remoteIp);
+    collector.stopPipe(pipeName, remoteIp, createTime);
   }
 
   /** drop an existed pipe named pipeName */
-  public void dropPipe(String pipeName, String remoteIp) throws IOException {
+  public void dropPipe(String pipeName, String remoteIp, long createTime) throws IOException {
     receiverManager.dropPipe(pipeName, remoteIp);
+    collector.stopPipe(pipeName, remoteIp, createTime);
+    File dir = new File(SyncPathUtil.getReceiverPipeDir(pipeName, remoteIp, createTime));
+    FileUtils.deleteDirectory(dir);
+  }
+
+  private void createDir(String pipeName, String remoteIp, long createTime) {
+    File f = new File(SyncPathUtil.getReceiverFileDataDir(pipeName, remoteIp, createTime));
+    if (!f.exists()) {
+      f.mkdirs();
+    }
+    f = new File(SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp, createTime));
+    if (!f.exists()) {
+      f.mkdirs();
+    }
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
index 3c44ed0..9321f5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
@@ -19,4 +19,190 @@
  */
 package org.apache.iotdb.db.newsync.receiver.collector;
 
-public class Collector {}
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+import org.apache.iotdb.db.newsync.utils.BufferedPipeDataBlockingQueue;
+import org.apache.iotdb.db.newsync.utils.SyncConstant;
+import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/** scan sync receiver folder and load pipeData into IoTDB */
+public class Collector {
+
+  private static final Logger logger = LoggerFactory.getLogger(Collector.class);
+  private static final int WAIT_TIMEOUT = 2000;
+  private ExecutorService executorService;
+  private Map<String, Future> taskFutures;
+
+  public Collector() {
+    taskFutures = new ConcurrentHashMap<>();
+  }
+
+  public void startCollect() {
+    this.executorService =
+        IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.SYNC_RECEIVER_COLLECTOR.getName());
+  }
+
+  public void stopCollect() {
+    for (Future f : taskFutures.values()) {
+      f.cancel(true);
+    }
+    if (executorService != null) {
+      executorService.shutdownNow();
+      int totalWaitTime = WAIT_TIMEOUT;
+      while (!executorService.isTerminated()) {
+        try {
+          if (!executorService.awaitTermination(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
+            logger.info(
+                "{} thread pool doesn't exit after {}ms.",
+                ThreadName.SYNC_RECEIVER_COLLECTOR.getName(),
+                totalWaitTime);
+          }
+          totalWaitTime += WAIT_TIMEOUT;
+        } catch (InterruptedException e) {
+          logger.error(
+              "Interrupted while waiting {} thread pool to exit. ",
+              ThreadName.SYNC_RECEIVER_COLLECTOR.getName());
+          Thread.currentThread().interrupt();
+        }
+      }
+      executorService = null;
+    }
+  }
+
+  public void startPipe(String pipeName, String remoteIp, long createTime) {
+    String dir = SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp, createTime);
+    ScanTask task = new ScanTask(dir);
+    taskFutures.put(dir, executorService.submit(task));
+  }
+
+  public void stopPipe(String pipeName, String remoteIp, long createTime) {
+    String dir = SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp, createTime);
+    taskFutures.get(dir).cancel(true);
+    taskFutures.remove(dir);
+  }
+
+  private static BufferedPipeDataBlockingQueue parsePipeLogToBlockingQueue(File file)
+      throws IOException {
+    BufferedPipeDataBlockingQueue blockingQueue =
+        new BufferedPipeDataBlockingQueue(file.getAbsolutePath(), 100);
+    DataInputStream inputStream = new DataInputStream(new FileInputStream(file));
+    try {
+      while (true) {
+        blockingQueue.offer(PipeData.deserialize(inputStream));
+      }
+    } catch (EOFException e) {
+      logger.info(String.format("Finish parsing pipeLog %s.", file.getPath()));
+    } catch (IllegalPathException e) {
+      logger.error(String.format("Parsing pipeLog %s error, because %s", file.getPath(), e));
+      throw new IOException(e);
+    } finally {
+      inputStream.close();
+    }
+    blockingQueue.end();
+    return blockingQueue;
+  }
+
+  private class ScanTask implements Runnable {
+    private final String scanPath;
+
+    private ScanTask(String dirPath) {
+      scanPath = dirPath;
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (!Thread.interrupted()) {
+          File dir = new File(scanPath);
+          if (dir.exists() && dir.isDirectory()) {
+            BufferedPipeDataBlockingQueue pipeDataQueue;
+            File[] files = dir.listFiles((d, s) -> !s.endsWith(SyncConstant.COLLECTOR_SUFFIX));
+            int nextIndex = 0;
+
+            if (files.length > 0) {
+              // read from disk
+              // TODO: Assuming that the file name is incremented by number
+              Arrays.sort(files, Comparator.comparingLong(o -> Long.parseLong(o.getName())));
+              try {
+                pipeDataQueue = parsePipeLogToBlockingQueue(files[0]);
+              } catch (IOException e) {
+                logger.error("Parse pipe data log {} error.", files[0].getPath());
+                // TODO: stop
+                return;
+              }
+            } else {
+              // read from buffer
+              // TODO: get buffer from transport, this is mock implement
+              pipeDataQueue = BufferedPipeDataBlockingQueue.getMock();
+            }
+
+            File recordFile = new File(pipeDataQueue.getFileName() + SyncConstant.COLLECTOR_SUFFIX);
+            if (recordFile.exists()) {
+              RandomAccessFile raf = new RandomAccessFile(recordFile, "r");
+              if (raf.length() > Integer.BYTES) {
+                raf.seek(raf.length() - Integer.BYTES);
+                nextIndex = raf.readInt() + 1;
+              }
+              raf.close();
+            }
+            DataOutputStream outputStream =
+                new DataOutputStream(new FileOutputStream(recordFile, true));
+            while (!pipeDataQueue.isEnd()) {
+              PipeData pipeData = null;
+              try {
+                pipeData = pipeDataQueue.take();
+              } catch (InterruptedException e) {
+                outputStream.close();
+                Thread.currentThread().interrupt();
+              }
+              int currentIndex = pipeDataQueue.getAndIncreaseIndex();
+              if (currentIndex < nextIndex) {
+                continue;
+              }
+              try {
+                logger.info(
+                    "Start load pipeData with serialize number {} and type {}",
+                    pipeData.getSerialNumber(),
+                    pipeData.getType());
+                pipeData.createLoader().load();
+                outputStream.writeInt(currentIndex);
+              } catch (Exception e) {
+                // TODO: how to response error message to sender?
+                // TODO: should drop this pipe?
+                logger.error(
+                    "Cannot load pipeData with serialize number {} and type {}, because {}",
+                    pipeData.getSerialNumber(),
+                    pipeData.getType(),
+                    e.getMessage());
+                break;
+              }
+            }
+            outputStream.close();
+            // if all success loaded, remove pipelog and record file
+            File pipeLog = new File(pipeDataQueue.getFileName());
+            if (pipeLog.exists()) {
+              Files.deleteIfExists(pipeLog.toPath());
+              Files.deleteIfExists(Paths.get(files[0].getPath() + SyncConstant.COLLECTOR_SUFFIX));
+            }
+          }
+        }
+      } catch (IOException e) {
+        logger.error(e.getMessage());
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
index 4dd33e3..dde1aac 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.newsync.receiver.manager;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.newsync.receiver.recovery.ReceiverLog;
 import org.apache.iotdb.db.newsync.receiver.recovery.ReceiverLogAnalyzer;
-import org.apache.iotdb.db.service.ServiceType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,16 +38,10 @@ public class ReceiverManager {
   private ReceiverLog log;
 
   public void init() throws StartupException {
-    try {
-      log = new ReceiverLog();
-    } catch (IOException e) {
-      logger.error(e.getMessage());
-      throw new StartupException(
-          ServiceType.RECEIVER_SERVICE.getName(), "cannot create receiver log");
-    }
     ReceiverLogAnalyzer.scan();
     pipeInfoMap = ReceiverLogAnalyzer.getPipeInfoMap();
     pipeServerEnable = ReceiverLogAnalyzer.isPipeServerEnable();
+    log = new ReceiverLog();
   }
 
   public void close() throws IOException {
@@ -65,36 +58,28 @@ public class ReceiverManager {
     pipeServerEnable = false;
   }
 
-  public void createPipe(String pipeName, String remoteIp, long startTime) throws IOException {
-    if (log != null) {
-      log.createPipe(pipeName, remoteIp, startTime);
-    }
+  public void createPipe(String pipeName, String remoteIp, long createTime) throws IOException {
+    log.createPipe(pipeName, remoteIp, createTime);
     if (!pipeInfoMap.containsKey(pipeName)) {
       pipeInfoMap.put(pipeName, new HashMap<>());
     }
     pipeInfoMap
         .get(pipeName)
-        .put(remoteIp, new PipeInfo(pipeName, remoteIp, PipeStatus.RUNNING, startTime));
+        .put(remoteIp, new PipeInfo(pipeName, remoteIp, PipeStatus.RUNNING, createTime));
   }
 
   public void startPipe(String pipeName, String remoteIp) throws IOException {
-    if (log != null) {
-      log.startPipe(pipeName, remoteIp);
-    }
+    log.startPipe(pipeName, remoteIp);
     pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.RUNNING);
   }
 
   public void stopPipe(String pipeName, String remoteIp) throws IOException {
-    if (log != null) {
-      log.stopPipe(pipeName, remoteIp);
-    }
+    log.stopPipe(pipeName, remoteIp);
     pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.PAUSE);
   }
 
   public void dropPipe(String pipeName, String remoteIp) throws IOException {
-    if (log != null) {
-      log.dropPipe(pipeName, remoteIp);
-    }
+    log.dropPipe(pipeName, remoteIp);
     pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.DROP);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
index 55427e1..c0dc7b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
@@ -18,9 +18,9 @@
  */
 package org.apache.iotdb.db.newsync.receiver.recovery;
 
-import org.apache.iotdb.db.newsync.conf.SyncConstant;
 import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.newsync.utils.SyncConstant;
+import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -30,8 +30,8 @@ import java.io.IOException;
 public class ReceiverLog {
   private BufferedWriter bw;
 
-  public ReceiverLog() throws IOException {
-    File logFile = new File(SenderConf.sysDir, SyncConstant.RECEIVER_LOG_NAME);
+  public void init() throws IOException {
+    File logFile = new File(SyncPathUtil.getSysDir(), SyncConstant.RECEIVER_LOG_NAME);
     if (!logFile.getParentFile().exists()) {
       logFile.getParentFile().mkdirs();
     }
@@ -39,12 +39,18 @@ public class ReceiverLog {
   }
 
   public void startPipeServer() throws IOException {
+    if (bw == null) {
+      init();
+    }
     bw.write("on");
     bw.newLine();
     bw.flush();
   }
 
   public void stopPipeServer() throws IOException {
+    if (bw == null) {
+      init();
+    }
     bw.write("off");
     bw.newLine();
     bw.flush();
@@ -68,12 +74,18 @@ public class ReceiverLog {
 
   private void writeLog(String pipeName, String remoteIp, PipeStatus status, long time)
       throws IOException {
+    if (bw == null) {
+      init();
+    }
     bw.write(String.format("%s,%s,%s,%d", pipeName, remoteIp, status, time));
     bw.newLine();
     bw.flush();
   }
 
   private void writeLog(String pipeName, String remoteIp, PipeStatus status) throws IOException {
+    if (bw == null) {
+      init();
+    }
     bw.write(String.format("%s,%s,%s", pipeName, remoteIp, status));
     bw.newLine();
     bw.flush();
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
index 50e6724..5ffe24c 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
@@ -19,10 +19,10 @@
 package org.apache.iotdb.db.newsync.receiver.recovery;
 
 import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.newsync.conf.SyncConstant;
 import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
 import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.newsync.utils.SyncConstant;
+import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
 import org.apache.iotdb.db.service.ServiceType;
 
 import org.slf4j.Logger;
@@ -43,7 +43,7 @@ public class ReceiverLogAnalyzer {
     logger.info("Start to recover all sync state for sync receiver.");
     pipeInfoMap = new HashMap<>();
     pipeServerEnable = false;
-    File logFile = new File(SenderConf.sysDir, SyncConstant.RECEIVER_LOG_NAME);
+    File logFile = new File(SyncPathUtil.getSysDir(), SyncConstant.RECEIVER_LOG_NAME);
     try (BufferedReader loadReader = new BufferedReader(new FileReader(logFile))) {
       String line;
       int lineNum = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
index d24598f..a182023 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
@@ -173,6 +173,8 @@ public class TsFilePipeLogAnalyzer {
     } catch (IllegalPathException e) {
       logger.error(String.format("Parsing pipeLog %s error, because %s", file.getPath(), e));
       throw new IOException(e);
+    } finally {
+      inputStream.close();
     }
     return pipeData;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/utils/BufferedPipeDataBlockingQueue.java b/server/src/main/java/org/apache/iotdb/db/newsync/utils/BufferedPipeDataBlockingQueue.java
new file mode 100644
index 0000000..537bc4e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/utils/BufferedPipeDataBlockingQueue.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.newsync.utils;
+
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+
+import java.io.File;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+public class BufferedPipeDataBlockingQueue {
+  // TODO: delete mock if unnecessary
+  private static BufferedPipeDataBlockingQueue mock;
+  private String fileName;
+  private BlockingQueue<PipeData> pipeDataQueue;
+  private int currentIndex;
+  private boolean end;
+
+  public BufferedPipeDataBlockingQueue(String fileName, int blockingQueueCapacity) {
+    this.fileName = fileName;
+    this.currentIndex = 0;
+    this.end = false;
+    pipeDataQueue = new ArrayBlockingQueue<>(blockingQueueCapacity);
+  }
+
+  public String getFileName() {
+    return fileName;
+  }
+
+  public int getAndIncreaseIndex() {
+    return currentIndex++;
+  }
+
+  public boolean offer(PipeData pipeData) {
+    return pipeDataQueue.offer(pipeData);
+  }
+
+  public PipeData take() throws InterruptedException {
+    return pipeDataQueue.take();
+  }
+
+  public void end() {
+    this.end = true;
+  }
+
+  public boolean isEnd() {
+    return end && pipeDataQueue.isEmpty();
+  }
+
+  // TODO: delete mock if unnecessary
+  public static BufferedPipeDataBlockingQueue getMock() {
+    if (mock == null) {
+      String mockPath = SyncPathUtil.getReceiverPipeLogDir("mock", "127.0.0.1", 0);
+      File file = new File(mockPath);
+      if (!file.exists()) {
+        file.mkdirs();
+      }
+      mock = new BufferedPipeDataBlockingQueue(mockPath + File.separator + "100", 100);
+    }
+    return mock;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java b/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncConstant.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
rename to server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncConstant.java
index 9a6d0d5..ecf0bd3 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncConstant.java
@@ -16,8 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.newsync.conf;
+package org.apache.iotdb.db.newsync.utils;
 
 public class SyncConstant {
+  public static final String SYNC_SYS_DIR = "sys";
+  public static final String RECEIVER_DIR = "receiver";
   public static final String RECEIVER_LOG_NAME = "receiverService.log";
+  public static final String PIPELOG_DIR_NAME = "pipe-log";
+  public static final String FILEDATA_DIR_NAME = "file-data";
+  public static final String COLLECTOR_SUFFIX = ".collector";
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncPathUtil.java b/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncPathUtil.java
new file mode 100644
index 0000000..f57a080
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncPathUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.utils;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import java.io.File;
+
+/** Util for path generation in sync module */
+public class SyncPathUtil {
+  public static String getReceiverPipeLogDir(String pipeName, String remoteIp, long createTime) {
+    return getReceiverPipeDir(pipeName, remoteIp, createTime)
+        + File.separator
+        + SyncConstant.PIPELOG_DIR_NAME;
+  }
+
+  public static String getReceiverFileDataDir(String pipeName, String remoteIp, long createTime) {
+    return getReceiverPipeDir(pipeName, remoteIp, createTime)
+        + File.separator
+        + SyncConstant.FILEDATA_DIR_NAME;
+  }
+
+  public static String getReceiverPipeDir(String pipeName, String remoteIp, long createTime) {
+    return getReceiverDir()
+        + File.separator
+        + String.format("%s-%d-%s", pipeName, createTime, remoteIp);
+  }
+
+  public static String getReceiverDir() {
+    return IoTDBDescriptor.getInstance().getConfig().getNewSyncDir()
+        + File.separator
+        + SyncConstant.RECEIVER_DIR;
+  }
+
+  public static String getSysDir() {
+    return IoTDBDescriptor.getInstance().getConfig().getNewSyncDir()
+        + File.separator
+        + SyncConstant.SYNC_SYS_DIR;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 4aa910e..2a82e6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -193,9 +193,7 @@ public abstract class Operator {
     PRUNE_TEMPLATE,
     APPEND_TEMPLATE,
     DROP_TEMPLATE,
-
     SHOW_QUERY_RESOURCE,
-
     CREATE_PIPESINK,
     DROP_PIPESINK,
     SHOW_PIPESINK,
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 0a35a93..fd6eb0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -249,6 +249,8 @@ public class EnvironmentUtils {
     cleanDir(config.getUdfDir());
     // delete tlog
     cleanDir(config.getTriggerDir());
+    // delete sync dir
+    cleanDir(config.getNewSyncDir());
     // delete data files
     for (String dataDir : config.getDataDirs()) {
       cleanDir(dataDir);
diff --git a/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/PipeDataTest.java b/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/PipeDataTest.java
new file mode 100644
index 0000000..ed9d79f
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/PipeDataTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.pipedata;
+
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+public class PipeDataTest {
+  private static final Logger logger = LoggerFactory.getLogger(PipeDataTest.class);
+  private static final String pipeLogPath = "target/pipelog";
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    EnvironmentUtils.cleanEnv();
+    Files.deleteIfExists(Paths.get(pipeLogPath));
+  }
+
+  @Test
+  public void testSerializeAndDeserialize() {
+    try {
+      File f1 = new File(pipeLogPath);
+      File f2 = new File(pipeLogPath);
+      PipeData pipeData1 = new TsFilePipeData("1", 1);
+      Deletion deletion = new Deletion(new PartialPath("root.sg1.d1.s1"), 0, 1, 5);
+      PipeData pipeData2 = new DeletionPipeData(deletion, 3);
+      PhysicalPlan plan = new SetStorageGroupPlan(new PartialPath("root.sg1"));
+      PipeData pipeData3 = new SchemaPipeData(plan, 2);
+      DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(f2));
+      pipeData1.serialize(outputStream);
+      outputStream.flush();
+      DataInputStream inputStream = new DataInputStream(new FileInputStream(f1));
+      Assert.assertEquals(pipeData1, PipeData.deserialize(inputStream));
+      pipeData2.serialize(outputStream);
+      outputStream.flush();
+      Assert.assertEquals(pipeData2, PipeData.deserialize(inputStream));
+      pipeData3.serialize(outputStream);
+      outputStream.flush();
+      Assert.assertEquals(pipeData3, PipeData.deserialize(inputStream));
+      inputStream.close();
+      outputStream.close();
+    } catch (Exception e) {
+      logger.error(e.getMessage());
+      Assert.fail();
+    }
+  }
+}