You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/01/25 03:20:21 UTC

[iotdb] branch new_sync updated: [To new_sync][IOTDB-2272] implement customized sync process: receiver (#4786)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch new_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/new_sync by this push:
     new 5e62a81  [To new_sync][IOTDB-2272] implement customized sync process: receiver (#4786)
5e62a81 is described below

commit 5e62a81e21bc61fc9428fb68c1b97115ca40ea0f
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Tue Jan 25 11:19:42 2022 +0800

    [To new_sync][IOTDB-2272] implement customized sync process: receiver (#4786)
---
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4    |   4 +
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  16 +-
 .../sync/IoTDBSyncReceiverLoaderIT.java            | 306 +++++++++++++++++++++
 .../iotdb/db/integration/sync/WriteUtil.java       | 102 +++++++
 .../resources/conf/iotdb-engine.properties         |   6 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  11 +
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   2 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   5 +
 .../apache/iotdb/db/newsync/conf/SyncConstant.java |  23 ++
 .../iotdb/db/newsync/receiver/ReceiverService.java | 177 ++++++++++++
 .../db/newsync/receiver/collector/Collector.java   |  22 ++
 .../db/newsync/receiver/load/DeletionLoader.java   |  44 +++
 .../iotdb/db/newsync/receiver/load/ILoader.java    |  36 +++
 .../db/newsync/receiver/load/SchemaLoader.java     |  54 ++++
 .../db/newsync/receiver/load/TsFileLoader.java     |  67 +++++
 .../db/newsync/receiver/manager/PipeInfo.java      |  83 ++++++
 .../db/newsync/receiver/manager/PipeStatus.java    |  25 ++
 .../newsync/receiver/manager/ReceiverManager.java  | 132 +++++++++
 .../db/newsync/receiver/recovery/ReceiverLog.java  |  85 ++++++
 .../receiver/recovery/ReceiverLogAnalyzer.java     | 106 +++++++
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |   6 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  20 ++
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   6 +-
 .../db/qp/logical/sys/ShowPipeServerOperator.java  |  51 ++++
 .../db/qp/logical/sys/StartPipeServerOperator.java |  38 +++
 .../db/qp/logical/sys/StopPipeServerOperator.java  |  38 +++
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |  10 +
 .../db/qp/physical/sys/ShowPipeServerPlan.java     |  41 +++
 .../apache/iotdb/db/qp/physical/sys/ShowPlan.java  |   3 +-
 .../db/qp/physical/sys/StartPipeServerPlan.java    |  56 ++++
 .../db/qp/physical/sys/StopPipeServerPlan.java     |  56 ++++
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  19 ++
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   2 +
 .../org/apache/iotdb/db/service/ServiceType.java   |   1 +
 .../receiver/manager/ReceiverManagerTest.java      |  81 ++++++
 .../receiver/recovery/ReceiverLogAnalyzerTest.java |  82 ++++++
 .../iotdb/db/qp/physical/PhysicalPlanTest.java     |  34 +++
 37 files changed, 1845 insertions(+), 5 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
index 6b7398d..f406677 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
@@ -362,6 +362,10 @@ PIPE
     : P I P E
     ;
 
+PIPESERVER
+    : P I P E S E R V E R
+    ;
+
 PIPESINK
     : P I P E S I N K
     ;
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 01982b7..24dbc79 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -63,7 +63,8 @@ utilityStatement
     | loadConfiguration | loadTimeseries | loadFile | removeFile | unloadFile;
 
 syncStatement
-    : createPipeSink | showPipeSinkType | showPipeSink | dropPipeSink
+    : startPipeServer | stopPipeServer | showPipeServer
+    | createPipeSink | showPipeSinkType | showPipeSink | dropPipeSink
     | createPipe | showPipe | stopPipe | startPipe | dropPipe;
 
 /**
@@ -706,6 +707,19 @@ syncAttributeClauses
     : propertyClause (COMMA propertyClause)*
     ;
 
+// sync receiver
+startPipeServer
+    : START PIPESERVER
+    ;
+
+stopPipeServer
+    : STOP PIPESERVER
+    ;
+
+showPipeServer
+    : SHOW PIPESERVER (pipeName=ID)?
+    ;
+
 /**
  * 7. Common Clauses
  */
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
new file mode 100644
index 0000000..3d96ccb
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.sync;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.newsync.receiver.load.DeletionLoader;
+import org.apache.iotdb.db.newsync.receiver.load.ILoader;
+import org.apache.iotdb.db.newsync.receiver.load.SchemaLoader;
+import org.apache.iotdb.db.newsync.receiver.load.TsFileLoader;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.*;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@Category({LocalStandaloneTest.class})
+public class IoTDBSyncReceiverLoaderIT {
+  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncReceiverLoaderIT.class);
+  protected static boolean enableSeqSpaceCompaction;
+  protected static boolean enableUnseqSpaceCompaction;
+  protected static boolean enableCrossSpaceCompaction;
+  /** create tsfile and move to tmpDir for sync test */
+  File tmpDir = new File("target/synctest");
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+    enableSeqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+    WriteUtil.insertData();
+    EnvironmentUtils.shutdownDaemon();
+    File srcDir = new File(IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0]);
+    FileUtils.moveDirectory(srcDir, tmpDir);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    FileUtils.deleteDirectory(tmpDir);
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void test() throws Exception {
+    // 1. restart IoTDB
+    EnvironmentUtils.cleanEnv();
+    EnvironmentUtils.envSetUp();
+
+    // 2. test for SchemaLoader
+    List<PhysicalPlan> planList = new ArrayList<>();
+    planList.add(new SetStorageGroupPlan(new PartialPath("root.vehicle")));
+    planList.add(
+        new CreateTimeSeriesPlan(
+            new PartialPath("root.vehicle.d0.s0"),
+            new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.RLE)));
+    planList.add(
+        new CreateTimeSeriesPlan(
+            new PartialPath("root.vehicle.d0.s1"),
+            new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN)));
+    planList.add(
+        new CreateTimeSeriesPlan(
+            new PartialPath("root.vehicle.d1.s2"),
+            new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE)));
+    planList.add(
+        new CreateTimeSeriesPlan(
+            new PartialPath("root.vehicle.d1.s3"),
+            new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.PLAIN)));
+    planList.add(new SetStorageGroupPlan(new PartialPath("root.sg1")));
+    planList.add(
+        new CreateAlignedTimeSeriesPlan(
+            new PartialPath("root.sg1.d1"),
+            Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+            Arrays.asList(
+                TSDataType.FLOAT,
+                TSDataType.INT32,
+                TSDataType.INT64,
+                TSDataType.BOOLEAN,
+                TSDataType.TEXT),
+            Arrays.asList(
+                TSEncoding.RLE,
+                TSEncoding.GORILLA,
+                TSEncoding.RLE,
+                TSEncoding.RLE,
+                TSEncoding.PLAIN),
+            Arrays.asList(
+                CompressionType.SNAPPY,
+                CompressionType.SNAPPY,
+                CompressionType.SNAPPY,
+                CompressionType.SNAPPY,
+                CompressionType.SNAPPY),
+            null));
+    for (PhysicalPlan plan : planList) {
+      ILoader planLoader = new SchemaLoader(plan);
+      try {
+        planLoader.load();
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+    }
+
+    // 3. test for TsFileLoader
+    List<File> tsFiles = getTsFilePaths(tmpDir);
+    for (File tsfile : tsFiles) {
+      ILoader tsFileLoader = new TsFileLoader(tsfile);
+      try {
+        tsFileLoader.load();
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+    }
+
+    // 4. test for DeletionPlanLoader
+    Deletion deletion = new Deletion(new PartialPath("root.vehicle.**"), 0, 33, 38);
+    ILoader deletionLoader = new DeletionLoader(deletion);
+    try {
+      deletionLoader.load();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+
+    // 5. check result after loading
+    // 5.1 check normal timeseries
+    String sql1 = "select * from root.vehicle.*";
+    String[] retArray1 =
+        new String[] {
+          "6,120,null,null,null",
+          "9,null,123,null,null",
+          "16,128,null,null,16.0",
+          "18,189,198,true,18.0",
+          "20,null,null,false,null",
+          "29,null,null,true,1205.0",
+          "99,null,1234,null,null"
+        };
+    String[] columnNames1 = {
+      "root.vehicle.d0.s0", "root.vehicle.d0.s1", "root.vehicle.d1.s3", "root.vehicle.d1.s2"
+    };
+    checkResult(sql1, columnNames1, retArray1);
+    // 5.2 check aligned timeseries
+    String sql2 = "select * from root.sg1.d1";
+    String[] retArray2 =
+        new String[] {
+          "1,1.0,1,null,true,aligned_test1",
+          "2,2.0,2,null,null,aligned_test2",
+          "3,3.0,null,null,false,aligned_test3",
+          "4,4.0,4,null,true,aligned_test4",
+          "5,130000.0,130000,130000,false,aligned_unseq_test1",
+          "6,6.0,6,6,true,null",
+          "7,7.0,7,7,false,aligned_test7",
+          "8,8.0,8,8,null,aligned_test8",
+          "9,9.0,9,9,false,aligned_test9",
+        };
+    String[] columnNames2 = {
+      "root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s3", "root.sg1.d1.s4", "root.sg1.d1.s5",
+    };
+    checkResult(sql2, columnNames2, retArray2);
+  }
+
+  private void checkResult(String sql, String[] columnNames, String[] retArray)
+      throws ClassNotFoundException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet = statement.execute(sql);
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+      Map<String, Integer> map = new HashMap<>();
+      for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+        map.put(resultSetMetaData.getColumnName(i), i);
+      }
+      assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+      int cnt = 0;
+      while (resultSet.next()) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(resultSet.getString(1));
+        for (String columnName : columnNames) {
+          int index = map.get(columnName);
+          builder.append(",").append(resultSet.getString(index));
+        }
+        assertEquals(retArray[cnt], builder.toString());
+        cnt++;
+      }
+      assertEquals(retArray.length, cnt);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  /**
+   * scan parentDir and return all TsFile sorted by load sequence
+   *
+   * @param parentDir folder to scan
+   */
+  private List<File> getTsFilePaths(File parentDir) {
+    List<File> res = new ArrayList<>();
+    if (!parentDir.exists()) {
+      Assert.fail();
+      return res;
+    }
+    scanDir(res, parentDir);
+    Collections.sort(
+        res,
+        new Comparator<File>() {
+          @Override
+          public int compare(File f1, File f2) {
+            int diffSg =
+                f1.getParentFile()
+                    .getParentFile()
+                    .getParentFile()
+                    .getName()
+                    .compareTo(f2.getParentFile().getParentFile().getParentFile().getName());
+            if (diffSg != 0) {
+              return diffSg;
+            } else {
+              return (int)
+                  (FilePathUtils.splitAndGetTsFileVersion(f1.getName())
+                      - FilePathUtils.splitAndGetTsFileVersion(f2.getName()));
+            }
+          }
+        });
+    return res;
+  }
+
+  private void scanDir(List<File> tsFiles, File parentDir) {
+    if (!parentDir.exists()) {
+      Assert.fail();
+      return;
+    }
+    File fa[] = parentDir.listFiles();
+    for (int i = 0; i < fa.length; i++) {
+      File fs = fa[i];
+      if (fs.isDirectory()) {
+        scanDir(tsFiles, fs);
+      } else if (fs.getName().endsWith(".resource")) {
+        // only add tsfile that has been flushed
+        tsFiles.add(new File(fs.getAbsolutePath().substring(0, fs.getAbsolutePath().length() - 9)));
+        try {
+          FileUtils.delete(fs);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+}
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/WriteUtil.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/WriteUtil.java
new file mode 100644
index 0000000..9d4da20
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/WriteUtil.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.sync;
+
+import org.apache.iotdb.jdbc.Config;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+
+public class WriteUtil {
+
+  private static final String[] sqls =
+      new String[] {
+        "SET STORAGE GROUP TO root.vehicle",
+        "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+        "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.vehicle.d1.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+        "CREATE TIMESERIES root.vehicle.d1.s3 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+        "SET STORAGE GROUP TO root.sg1",
+        "create aligned timeseries root.sg1.d1(s1 FLOAT encoding=RLE, s2 INT32 encoding=Gorilla compression=SNAPPY, s3 INT64, s4 BOOLEAN, s5 TEXT)",
+        "insert into root.sg1.d1(time, s1, s2, s3, s4, s5) aligned values(1, 1.0, 1, 1, TRUE, 'aligned_test1')",
+        "insert into root.sg1.d1(time, s1, s2, s3, s5) aligned values(2, 2.0, 2, 2, 'aligned_test2')",
+        "insert into root.sg1.d1(time, s1, s3, s4, s5) aligned values(3, 3.0, 3, FALSE, 'aligned_test3')",
+        "insert into root.sg1.d1(time, s1, s2, s4, s5) aligned values(4, 4.0, 4, TRUE, 'aligned_test4')",
+        "insert into root.sg1.d1(time, s1, s2, s4, s5) aligned values(5, 5.0, 5, TRUE, 'aligned_test5')",
+        "insert into root.sg1.d1(time, s1, s2, s3, s4) aligned values(6, 6.0, 6, 6, TRUE)",
+        "insert into root.sg1.d1(time, s1, s2, s3, s4, s5) aligned values(7, 7.0, 7, 7, FALSE, 'aligned_test7')",
+        "insert into root.sg1.d1(time, s1, s2, s3, s5) aligned values(8, 8.0, 8, 8, 'aligned_test8')",
+        "insert into root.sg1.d1(time, s1, s2, s3, s4, s5) aligned values(9, 9.0, 9, 9, FALSE, 'aligned_test9')",
+        "insert into root.vehicle.d0(timestamp,s0) values(10,10)",
+        "insert into root.vehicle.d0(timestamp,s0,s1) values(12,12,'12')",
+        "insert into root.vehicle.d0(timestamp,s1) values(14,'14')",
+        "insert into root.vehicle.d1(timestamp,s2) values(16,16.0)",
+        "insert into root.vehicle.d1(timestamp,s2,s3) values(18,18.0,true)",
+        "insert into root.vehicle.d1(timestamp,s3) values(20,false)",
+        "flush",
+        "insert into root.vehicle.d0(timestamp,s0) values(6,120)",
+        "insert into root.vehicle.d0(timestamp,s0,s1) values(38,121,'122')",
+        "insert into root.vehicle.d0(timestamp,s1) values(9,'123')",
+        "insert into root.vehicle.d0(timestamp,s0) values(16,128)",
+        "insert into root.sg1.d1(time, s1, s2, s3, s4, s5) aligned values(5, 130000.0, 130000, 130000, FALSE, 'aligned_unseq_test1')",
+        "flush",
+        "insert into root.vehicle.d0(timestamp,s0,s1) values(18,189,'198')",
+        "insert into root.vehicle.d0(timestamp,s1) values(99,'1234')",
+        "insert into root.vehicle.d1(timestamp,s2) values(14,1024.0)",
+        "insert into root.vehicle.d1(timestamp,s2,s3) values(29,1205.0,true)",
+        "insert into root.vehicle.d1(timestamp,s3) values(33,true)",
+        "delete from root.sg1.d1.s3 where time<=3",
+        "flush",
+        "delete from root.vehicle.** where time >= 10 and time<=14",
+        "flush",
+        // no flush data
+        "insert into root.sg1.d1(time, s1, s3, s4) aligned values(23, 230000.0, 230000, FALSE)",
+        "insert into root.sg1.d1(time, s2, s5) aligned values(31, 31, 'aligned_test31')",
+        "insert into root.sg1.d1(time, s2, s5) aligned values(32, 32, 'aligned_test32')",
+        "insert into root.sg1.d1(time, s2, s5) aligned values(33, 33, 'aligned_test33')",
+        "insert into root.sg1.d1(time, s2, s5) aligned values(34, 34, 'aligned_test34')",
+        "insert into root.sg1.d1(time, s2, s5) aligned values(35, 35, 'aligned_test35')",
+        "insert into root.sg1.d1(time, s2, s5) aligned values(36, 36, 'aligned_test36')",
+        "insert into root.sg1.d1(time, s2, s5) aligned values(37, 37, 'aligned_test37')",
+        "insert into root.sg1.d1(time, s2, s5) aligned values(38, 38, 'aligned_test38')",
+        "insert into root.sg1.d1(time, s2, s5) aligned values(39, 39, 'aligned_test39')",
+        "insert into root.sg1.d1(time, s2, s5) aligned values(40, 40, 'aligned_test40')",
+      };
+
+  public static void insertData() throws ClassNotFoundException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      // create aligned and non-aligned time series
+      for (String sql : sqls) {
+        statement.execute(sql);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  public static void main(String[] args) throws ClassNotFoundException {
+    insertData();
+  }
+}
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index b1a9bf4..d76605e 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -568,6 +568,12 @@ timestamp_precision=ms
 # Datatype: int
 # external_sort_threshold=1000
 
+####################
+### PIPE Server Configuration
+####################
+# PIPE server port to listen
+# Datatype: int
+# local_pipe_server_port=5555
 
 ####################
 ### Sync Server Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index befc91d..d43afa4 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -469,6 +469,9 @@ public class IoTDBConfig {
   /** If this IoTDB instance is a receiver of sync, set the server port. */
   private int syncServerPort = 5555;
 
+  /** If this IoTDB instance is a receiver of sync, set the server port. */
+  private int pipeServerPort = 5555;
+
   /**
    * Set the language version when loading file including error information, default value is "EN"
    */
@@ -1283,6 +1286,14 @@ public class IoTDBConfig {
     this.syncServerPort = syncServerPort;
   }
 
+  public int getPipeServerPort() {
+    return pipeServerPort;
+  }
+
+  public void setPipeServerPort(int pipeServerPort) {
+    this.pipeServerPort = pipeServerPort;
+  }
+
   String getLanguageVersion() {
     return languageVersion;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index b449ef3..f0f939c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -131,6 +131,8 @@ public class IoTDBConstant {
   public static final String COLUMN_PIPE2PIPESINK_NAME = "pipeSink";
   public static final String COLUMN_PIPE_STATUS = "status";
 
+  // sync receiver
+  public static final String COLUMN_PIPE_REMOTE_IP = "remote ip";
   public static final String ONE_LEVEL_PATH_WILDCARD = "*";
   public static final String MULTI_LEVEL_PATH_WILDCARD = "**";
   public static final String TIME = "time";
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f46671b..2c3a80b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -379,6 +379,11 @@ public class IoTDBDescriptor {
               properties
                   .getProperty("sync_server_port", Integer.toString(conf.getSyncServerPort()))
                   .trim()));
+      conf.setSyncServerPort(
+          Integer.parseInt(
+              properties
+                  .getProperty("local_pipe_server_port", Integer.toString(conf.getSyncServerPort()))
+                  .trim()));
 
       conf.setIpWhiteList(properties.getProperty("ip_white_list", conf.getIpWhiteList()));
 
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java b/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
new file mode 100644
index 0000000..9a6d0d5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.conf;
+
+public class SyncConstant {
+  public static final String RECEIVER_LOG_NAME = "receiverService.log";
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
new file mode 100644
index 0000000..e85bc7c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver;
+
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.newsync.receiver.collector.Collector;
+import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
+import org.apache.iotdb.db.newsync.receiver.manager.ReceiverManager;
+import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.db.query.dataset.ListDataSet;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.*;
+
+public class ReceiverService implements IService {
+  private static final Logger logger = LoggerFactory.getLogger(ReceiverService.class);
+  private static final ReceiverManager receiverManager = ReceiverManager.getInstance();
+  private Collector collector;
+
+  /** start receiver service */
+  public boolean startPipeServer() {
+    try {
+      receiverManager.startServer();
+      // TODO: start socket and collector
+    } catch (IOException e) {
+      logger.error(e.getMessage());
+      return false;
+    }
+    return true;
+  }
+
+  /** stop receiver service */
+  public boolean stopPipeServer() {
+    try {
+      receiverManager.stopServer();
+      // TODO: stop socket and collector
+    } catch (IOException e) {
+      logger.error(e.getMessage());
+      return false;
+    }
+    return true;
+  }
+
+  /** create and start a new pipe named pipeName */
+  public void createPipe(String pipeName, String remoteIp, long startTime) throws IOException {
+    receiverManager.createPipe(pipeName, remoteIp, startTime);
+  }
+
+  /** start an existed pipe named pipeName */
+  public void startPipe(String pipeName, String remoteIp) throws IOException {
+    receiverManager.startPipe(pipeName, remoteIp);
+  }
+
+  /** stop an existed pipe named pipeName */
+  public void stopPipe(String pipeName, String remoteIp) throws IOException {
+    receiverManager.stopPipe(pipeName, remoteIp);
+  }
+
+  /** drop an existed pipe named pipeName */
+  public void dropPipe(String pipeName, String remoteIp) throws IOException {
+    receiverManager.dropPipe(pipeName, remoteIp);
+  }
+
+  /**
+   * query by sql SHOW PIPE
+   *
+   * @return QueryDataSet contained three columns: pipe name, status and start time
+   */
+  public QueryDataSet showPipe(ShowPipeServerPlan plan) {
+    ListDataSet dataSet =
+        new ListDataSet(
+            Arrays.asList(
+                new PartialPath(COLUMN_PIPE_NAME, false),
+                new PartialPath(COLUMN_PIPE_REMOTE_IP, false),
+                new PartialPath(COLUMN_PIPE_STATUS, false),
+                new PartialPath(COLUMN_CREATED_TIME, false)),
+            Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+    List<PipeInfo> pipeInfos;
+    if (!StringUtils.isEmpty(plan.getPipeName())) {
+      pipeInfos = receiverManager.getPipeInfos(plan.getPipeName());
+    } else {
+      pipeInfos = receiverManager.getAllPipeInfos();
+    }
+    for (PipeInfo pipeInfo : pipeInfos) {
+      putPipeRecord(dataSet, pipeInfo);
+    }
+    return dataSet;
+  }
+
+  private void putPipeRecord(ListDataSet dataSet, PipeInfo pipeInfo) {
+    RowRecord rowRecord = new RowRecord(0);
+    Field pipeNameField = new Field(TSDataType.TEXT);
+    Field pipeRemoteIp = new Field(TSDataType.TEXT);
+    Field pipeStatusField = new Field(TSDataType.TEXT);
+    Field pipeCreateTimeField = new Field(TSDataType.TEXT);
+    pipeNameField.setBinaryV(new Binary(pipeInfo.getPipeName()));
+    pipeRemoteIp.setBinaryV(new Binary(pipeInfo.getRemoteIp()));
+    pipeStatusField.setBinaryV(new Binary(pipeInfo.getStatus().name()));
+    pipeCreateTimeField.setBinaryV(
+        new Binary(DatetimeUtils.convertLongToDate(pipeInfo.getCreateTime())));
+    rowRecord.addField(pipeNameField);
+    rowRecord.addField(pipeRemoteIp);
+    rowRecord.addField(pipeStatusField);
+    rowRecord.addField(pipeCreateTimeField);
+    dataSet.putRecord(rowRecord);
+  }
+
+  private ReceiverService() {
+    collector = new Collector();
+  }
+
+  public static ReceiverService getInstance() {
+    return ReceiverServiceHolder.INSTANCE;
+  }
+
+  /** IService * */
+  @Override
+  public void start() throws StartupException {
+    receiverManager.init();
+    if (receiverManager.isPipeServerEnable()) {
+      startPipeServer();
+    }
+  }
+
+  @Override
+  public void stop() {
+    stopPipeServer();
+    try {
+      receiverManager.close();
+    } catch (IOException e) {
+      logger.error(e.getMessage());
+    }
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.RECEIVER_SERVICE;
+  }
+
+  private static class ReceiverServiceHolder {
+    private static final ReceiverService INSTANCE = new ReceiverService();
+
+    private ReceiverServiceHolder() {}
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
new file mode 100644
index 0000000..3c44ed0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.newsync.receiver.collector;
+
+public class Collector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/DeletionLoader.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/DeletionLoader.java
new file mode 100644
index 0000000..b12c0cb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/DeletionLoader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.load;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageEngineReadonlyException;
+
+/** This loader is used to load deletion plan. */
+public class DeletionLoader implements ILoader {
+
+  private Deletion deletion;
+
+  public DeletionLoader(Deletion deletion) {
+    this.deletion = deletion;
+  }
+
+  @Override
+  public void load() throws StorageEngineException {
+    if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
+      throw new StorageEngineReadonlyException();
+    }
+    StorageEngine.getInstance()
+        .delete(deletion.getPath(), deletion.getStartTime(), deletion.getEndTime(), 0, null);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/ILoader.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/ILoader.java
new file mode 100644
index 0000000..05be693
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/ILoader.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.load;
+
+import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+
+import java.io.IOException;
+
+/**
+ * This interface is used to load files, including tsFile, syncTask, schema, modsFile and
+ * deletePlan.
+ */
+public interface ILoader {
+  void load()
+      throws StorageEngineException, IOException, MetadataException, WriteProcessException,
+          LoadFileException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/SchemaLoader.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/SchemaLoader.java
new file mode 100644
index 0000000..1547b5a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/SchemaLoader.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.load;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * This loader is used to PhysicalPlan. Support four types of physical plans: CREATE_TIMESERIES |
+ * CREATE_ALIGNED_TIMESERIES | DELETE_TIMESERIES | SET_STORAGE_GROUP
+ */
+public class SchemaLoader implements ILoader {
+  private static final Logger logger = LoggerFactory.getLogger(SchemaLoader.class);
+
+  private PhysicalPlan plan;
+
+  public SchemaLoader(PhysicalPlan plan) {
+    this.plan = plan;
+  }
+
+  @Override
+  public void load() throws IOException, MetadataException {
+    try {
+      MManager.getInstance().operation(plan);
+    } catch (StorageGroupAlreadySetException e) {
+      logger.warn(
+          "Sync receiver try to set storage group {} that has already been set",
+          e.getStorageGroupPath());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/TsFileLoader.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/TsFileLoader.java
new file mode 100644
index 0000000..1ee57d2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/TsFileLoader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.load;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.tools.TsFileRewriteTool;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** This loader is used to load tsFiles. If .mods file exists, it will be loaded as well. */
+public class TsFileLoader implements ILoader {
+
+  private File tsFile;
+
+  public TsFileLoader(File tsFile) {
+    this.tsFile = tsFile;
+  }
+
+  @Override
+  public void load()
+      throws IOException, MetadataException, WriteProcessException, StorageEngineException,
+          LoadFileException {
+    TsFileResource tsFileResource = new TsFileResource(tsFile);
+    tsFileResource.setClosed(true);
+    FileLoaderUtils.checkTsFileResource(tsFileResource);
+    List<TsFileResource> splitResources = new ArrayList();
+    if (tsFileResource.isSpanMultiTimePartitions()) {
+      TsFileRewriteTool.rewriteTsFile(tsFileResource, splitResources);
+      tsFileResource.writeLock();
+      tsFileResource.removeModFile();
+      tsFileResource.writeUnlock();
+    }
+
+    if (splitResources.isEmpty()) {
+      splitResources.add(tsFileResource);
+    }
+
+    for (TsFileResource resource : splitResources) {
+      StorageEngine.getInstance().loadNewTsFile(resource);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeInfo.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeInfo.java
new file mode 100644
index 0000000..535d662
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeInfo.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.manager;
+
+import java.util.Objects;
+
+public class PipeInfo {
+  private String pipeName;
+  private PipeStatus status;
+  private String remoteIp;
+  private long createTime;
+
+  public PipeInfo(String pipeName, String remoteIp, PipeStatus status, long createTime) {
+    this.pipeName = pipeName;
+    this.remoteIp = remoteIp;
+    this.status = status;
+    this.createTime = createTime;
+  }
+
+  public String getPipeName() {
+    return pipeName;
+  }
+
+  public void setPipeName(String pipeName) {
+    this.pipeName = pipeName;
+  }
+
+  public PipeStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(PipeStatus status) {
+    this.status = status;
+  }
+
+  public long getCreateTime() {
+    return createTime;
+  }
+
+  public void setCreateTime(long createTime) {
+    this.createTime = createTime;
+  }
+
+  public String getRemoteIp() {
+    return remoteIp;
+  }
+
+  public void setRemoteIp(String remoteIp) {
+    this.remoteIp = remoteIp;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    PipeInfo pipeInfo = (PipeInfo) o;
+    return createTime == pipeInfo.createTime
+        && Objects.equals(pipeName, pipeInfo.pipeName)
+        && status == pipeInfo.status
+        && Objects.equals(remoteIp, pipeInfo.remoteIp);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(pipeName, status, remoteIp, createTime);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeStatus.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeStatus.java
new file mode 100644
index 0000000..b38df8d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeStatus.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.manager;
+
+public enum PipeStatus {
+  RUNNING,
+  PAUSE,
+  DROP
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
new file mode 100644
index 0000000..4dd33e3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.manager;
+
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.newsync.receiver.recovery.ReceiverLog;
+import org.apache.iotdb.db.newsync.receiver.recovery.ReceiverLogAnalyzer;
+import org.apache.iotdb.db.service.ServiceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class ReceiverManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(ReceiverManager.class);
+
+  private boolean pipeServerEnable;
+  // <pipeName, <remoteIp, pipeInfo>>
+  private Map<String, Map<String, PipeInfo>> pipeInfoMap;
+  private ReceiverLog log;
+
+  public void init() throws StartupException {
+    try {
+      log = new ReceiverLog();
+    } catch (IOException e) {
+      logger.error(e.getMessage());
+      throw new StartupException(
+          ServiceType.RECEIVER_SERVICE.getName(), "cannot create receiver log");
+    }
+    ReceiverLogAnalyzer.scan();
+    pipeInfoMap = ReceiverLogAnalyzer.getPipeInfoMap();
+    pipeServerEnable = ReceiverLogAnalyzer.isPipeServerEnable();
+  }
+
+  public void close() throws IOException {
+    log.close();
+  }
+
+  public void startServer() throws IOException {
+    log.startPipeServer();
+    pipeServerEnable = true;
+  }
+
+  public void stopServer() throws IOException {
+    log.stopPipeServer();
+    pipeServerEnable = false;
+  }
+
+  public void createPipe(String pipeName, String remoteIp, long startTime) throws IOException {
+    if (log != null) {
+      log.createPipe(pipeName, remoteIp, startTime);
+    }
+    if (!pipeInfoMap.containsKey(pipeName)) {
+      pipeInfoMap.put(pipeName, new HashMap<>());
+    }
+    pipeInfoMap
+        .get(pipeName)
+        .put(remoteIp, new PipeInfo(pipeName, remoteIp, PipeStatus.RUNNING, startTime));
+  }
+
+  public void startPipe(String pipeName, String remoteIp) throws IOException {
+    if (log != null) {
+      log.startPipe(pipeName, remoteIp);
+    }
+    pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.RUNNING);
+  }
+
+  public void stopPipe(String pipeName, String remoteIp) throws IOException {
+    if (log != null) {
+      log.stopPipe(pipeName, remoteIp);
+    }
+    pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.PAUSE);
+  }
+
+  public void dropPipe(String pipeName, String remoteIp) throws IOException {
+    if (log != null) {
+      log.dropPipe(pipeName, remoteIp);
+    }
+    pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.DROP);
+  }
+
+  public List<PipeInfo> getPipeInfos(String pipeName) {
+    return new ArrayList<>(pipeInfoMap.get(pipeName).values());
+  }
+
+  public List<PipeInfo> getAllPipeInfos() {
+    List<PipeInfo> res = new ArrayList<>();
+    for (String pipeName : pipeInfoMap.keySet()) {
+      res.addAll(pipeInfoMap.get(pipeName).values());
+    }
+    return res;
+  }
+
+  public boolean isPipeServerEnable() {
+    return pipeServerEnable;
+  }
+
+  public void setPipeServerEnable(boolean pipeServerEnable) {
+    this.pipeServerEnable = pipeServerEnable;
+  }
+
+  public static ReceiverManager getInstance() {
+    return ReceiverMonitorHolder.INSTANCE;
+  }
+
+  private ReceiverManager() {}
+
+  private static class ReceiverMonitorHolder {
+    private static final ReceiverManager INSTANCE = new ReceiverManager();
+
+    private ReceiverMonitorHolder() {}
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
new file mode 100644
index 0000000..55427e1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.recovery;
+
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
+import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class ReceiverLog {
+  private BufferedWriter bw;
+
+  public ReceiverLog() throws IOException {
+    File logFile = new File(SenderConf.sysDir, SyncConstant.RECEIVER_LOG_NAME);
+    if (!logFile.getParentFile().exists()) {
+      logFile.getParentFile().mkdirs();
+    }
+    bw = new BufferedWriter(new FileWriter(logFile, true));
+  }
+
+  public void startPipeServer() throws IOException {
+    bw.write("on");
+    bw.newLine();
+    bw.flush();
+  }
+
+  public void stopPipeServer() throws IOException {
+    bw.write("off");
+    bw.newLine();
+    bw.flush();
+  }
+
+  public void createPipe(String pipeName, String remoteIp, long time) throws IOException {
+    writeLog(pipeName, remoteIp, PipeStatus.RUNNING, time);
+  }
+
+  public void startPipe(String pipeName, String remoteIp) throws IOException {
+    writeLog(pipeName, remoteIp, PipeStatus.RUNNING);
+  }
+
+  public void stopPipe(String pipeName, String remoteIp) throws IOException {
+    writeLog(pipeName, remoteIp, PipeStatus.PAUSE);
+  }
+
+  public void dropPipe(String pipeName, String remoteIp) throws IOException {
+    writeLog(pipeName, remoteIp, PipeStatus.DROP);
+  }
+
+  private void writeLog(String pipeName, String remoteIp, PipeStatus status, long time)
+      throws IOException {
+    bw.write(String.format("%s,%s,%s,%d", pipeName, remoteIp, status, time));
+    bw.newLine();
+    bw.flush();
+  }
+
+  private void writeLog(String pipeName, String remoteIp, PipeStatus status) throws IOException {
+    bw.write(String.format("%s,%s,%s", pipeName, remoteIp, status));
+    bw.newLine();
+    bw.flush();
+  }
+
+  public void close() throws IOException {
+    bw.close();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
new file mode 100644
index 0000000..50e6724
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.recovery;
+
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
+import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
+import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.service.ServiceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ReceiverLogAnalyzer {
+  private static final Logger logger = LoggerFactory.getLogger(ReceiverLogAnalyzer.class);
+  private static boolean pipeServerEnable =
+      false; // record recovery result of receiver server status
+  private static Map<String, Map<String, PipeInfo>> pipeInfoMap =
+      new HashMap<>(); // record recovery result of receiver server status
+
+  public static void scan() throws StartupException {
+    logger.info("Start to recover all sync state for sync receiver.");
+    pipeInfoMap = new HashMap<>();
+    pipeServerEnable = false;
+    File logFile = new File(SenderConf.sysDir, SyncConstant.RECEIVER_LOG_NAME);
+    try (BufferedReader loadReader = new BufferedReader(new FileReader(logFile))) {
+      String line;
+      int lineNum = 0;
+      while ((line = loadReader.readLine()) != null) {
+        lineNum++;
+        try {
+          analyzeLog(line);
+        } catch (Exception e) {
+          logger.error("Receiver log recovery error: log file parse error at line " + lineNum);
+          logger.error(e.getMessage());
+          throw new StartupException(
+              ServiceType.RECEIVER_SERVICE.getName(), "log file recover error at line " + lineNum);
+        }
+      }
+    } catch (IOException e) {
+      logger.info("Receiver log file not found");
+    }
+  }
+
+  public static boolean isPipeServerEnable() {
+    return pipeServerEnable;
+  }
+
+  public static Map<String, Map<String, PipeInfo>> getPipeInfoMap() {
+    return pipeInfoMap;
+  }
+
+  /**
+   * parse log line and load result
+   *
+   * @param logLine log line
+   */
+  private static void analyzeLog(String logLine) {
+    if (logLine.equals("on")) {
+      pipeServerEnable = true;
+    } else if (logLine.equals("off")) {
+      pipeServerEnable = false;
+    } else {
+      String[] items = logLine.split(",");
+      String pipeName = items[0];
+      String remoteIp = items[1];
+      PipeStatus status = PipeStatus.valueOf(items[2]);
+      if (status == PipeStatus.RUNNING) {
+        if (!pipeInfoMap.containsKey(pipeName)) {
+          pipeInfoMap.put(pipeName, new HashMap<>());
+        }
+        if (items.length == 4) {
+          // create
+          pipeInfoMap
+              .get(pipeName)
+              .put(remoteIp, new PipeInfo(pipeName, remoteIp, status, Long.parseLong(items[3])));
+        } else {
+          pipeInfoMap.get(pipeName).get(remoteIp).setStatus(status);
+        }
+      } else {
+        pipeInfoMap.get(pipeName).get(remoteIp).setStatus(status);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index 19c8715..9e3026e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -201,6 +201,9 @@ public class SQLConstant {
   public static final int TOK_STOP_PIPE = 206;
   public static final int TOK_START_PIPE = 207;
   public static final int TOK_DROP_PIPE = 208;
+  public static final int TOK_SHOW_PIPE_SERVER = 209;
+  public static final int TOK_PIPE_SERVER_START = 210;
+  public static final int TOK_PIPE_SERVER_STOP = 211;
 
   public static final Map<Integer, String> tokenNames = new HashMap<>();
 
@@ -291,6 +294,9 @@ public class SQLConstant {
     tokenNames.put(TOK_STOP_PIPE, "TOK_STOP_PIPE");
     tokenNames.put(TOK_START_PIPE, "TOK_START_PIPE");
     tokenNames.put(TOK_DROP_PIPE, "TOK_DROP_PIPE");
+    tokenNames.put(TOK_SHOW_PIPE_SERVER, "TOK_SHOW_PIPE_SERVER");
+    tokenNames.put(TOK_PIPE_SERVER_START, "TOK_PIPE_SERVER_START");
+    tokenNames.put(TOK_PIPE_SERVER_STOP, "TOK_PIPE_SERVER_STOP");
   }
 
   public static boolean isReservedPath(PartialPath pathStr) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index f4b3c75..2946cd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -58,6 +58,7 @@ import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.db.newsync.receiver.ReceiverService;
 import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
 import org.apache.iotdb.db.newsync.sender.pipe.PipeSink;
 import org.apache.iotdb.db.newsync.sender.service.SenderService;
@@ -124,6 +125,7 @@ import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowFunctionsPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowLockInfoPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPipeSinkPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowStorageGroupPlan;
@@ -419,6 +421,10 @@ public class PlanExecutor implements IPlanExecutor {
       case DROP_PIPESINK:
         dropPipeSink((DropPipeSinkPlan) plan);
         return true;
+      case START_PIPE_SERVER:
+        return operateStartPipeServer();
+      case STOP_PIPE_SERVER:
+        return operateStopPipeServer();
       case CREATE_PIPE:
         createPipe((CreatePipePlan) plan);
         return true;
@@ -433,6 +439,14 @@ public class PlanExecutor implements IPlanExecutor {
     }
   }
 
+  private boolean operateStopPipeServer() {
+    return ReceiverService.getInstance().stopPipeServer();
+  }
+
+  private boolean operateStartPipeServer() {
+    return ReceiverService.getInstance().startPipeServer();
+  }
+
   private boolean createTemplate(CreateTemplatePlan createTemplatePlan)
       throws QueryProcessException {
     try {
@@ -718,6 +732,8 @@ public class PlanExecutor implements IPlanExecutor {
         return processShowPipeSink((ShowPipeSinkPlan) showPlan);
       case PIPESINKTYPE:
         return processShowPipeSinkType();
+      case PIPESERVER:
+        return processShowPipeServer((ShowPipeServerPlan) showPlan);
       case PIPE:
         return processShowPipes((ShowPipePlan) showPlan);
       default:
@@ -725,6 +741,10 @@ public class PlanExecutor implements IPlanExecutor {
     }
   }
 
+  private QueryDataSet processShowPipeServer(ShowPipeServerPlan plan) {
+    return ReceiverService.getInstance().showPipe(plan);
+  }
+
   private QueryDataSet processCountNodes(CountPlan countPlan) throws MetadataException {
     int num =
         getNodesNumInGivenLevel(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 522d3f8..2a82e6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -193,9 +193,7 @@ public abstract class Operator {
     PRUNE_TEMPLATE,
     APPEND_TEMPLATE,
     DROP_TEMPLATE,
-
     SHOW_QUERY_RESOURCE,
-
     CREATE_PIPESINK,
     DROP_PIPESINK,
     SHOW_PIPESINK,
@@ -204,6 +202,8 @@ public abstract class Operator {
     SHOW_PIPE,
     STOP_PIPE,
     START_PIPE,
-    DROP_PIPE
+    DROP_PIPE,
+    START_PIPE_SERVER,
+    STOP_PIPE_SERVER,
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java
new file mode 100644
index 0000000..29801d1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class ShowPipeServerOperator extends ShowOperator {
+
+  private String pipeName;
+
+  public ShowPipeServerOperator(String pipeName, int tokenIntType) {
+    this(tokenIntType);
+    this.pipeName = pipeName;
+  }
+
+  public ShowPipeServerOperator(int tokenIntType) {
+    super(tokenIntType);
+  }
+
+  @Override
+  public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
+      throws QueryProcessException {
+    if (StringUtils.isEmpty(pipeName)) {
+      return new ShowPipeServerPlan();
+    } else {
+      return new ShowPipeServerPlan(pipeName);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeServerOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeServerOperator.java
new file mode 100644
index 0000000..e23833a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeServerOperator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.StartPipeServerPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+public class StartPipeServerOperator extends Operator {
+  public StartPipeServerOperator(int tokenIntType) {
+    super(tokenIntType);
+    operatorType = OperatorType.START_PIPE_SERVER;
+  }
+
+  @Override
+  public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
+      throws QueryProcessException {
+    return new StartPipeServerPlan();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeServerOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeServerOperator.java
new file mode 100644
index 0000000..f5bcdb2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeServerOperator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.StopPipeServerPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+public class StopPipeServerOperator extends Operator {
+  public StopPipeServerOperator(int tokenIntType) {
+    super(tokenIntType);
+    operatorType = OperatorType.START_PIPE_SERVER;
+  }
+
+  @Override
+  public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
+      throws QueryProcessException {
+    return new StopPipeServerPlan();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 91cc485..0b59db5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -68,7 +68,9 @@ import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.StartPipeServerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.StopPipeServerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
@@ -483,6 +485,12 @@ public abstract class PhysicalPlan {
         case SET_SYSTEM_MODE:
           plan = new SetSystemModePlan();
           break;
+        case START_PIPE_SERVER:
+          plan = new StartPipeServerPlan();
+          break;
+        case STOP_PIPE_SERVER:
+          plan = new StopPipeServerPlan();
+          break;
         default:
           throw new IOException("unrecognized log type " + type);
       }
@@ -553,6 +561,8 @@ public abstract class PhysicalPlan {
     UNSET_TEMPLATE,
     APPEND_TEMPLATE,
     PRUNE_TEMPLATE,
+    START_PIPE_SERVER,
+    STOP_PIPE_SERVER,
     DROP_TEMPLATE
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java
new file mode 100644
index 0000000..955bdf5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical.sys;
+
+public class ShowPipeServerPlan extends ShowPlan {
+
+  private String pipeName;
+
+  public ShowPipeServerPlan(String pipeName) {
+    super(ShowContentType.PIPESERVER);
+    this.pipeName = pipeName;
+  }
+
+  public ShowPipeServerPlan() {
+    super(ShowContentType.PIPESERVER);
+  }
+
+  public String getPipeName() {
+    return pipeName;
+  }
+
+  public void setPipeName(String pipeName) {
+    this.pipeName = pipeName;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
index 3d3c00f..961479d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
@@ -120,6 +120,7 @@ public class ShowPlan extends PhysicalPlan {
     QUERY_RESOURCE,
     PIPESINK,
     PIPESINKTYPE,
-    PIPE
+    PIPE,
+    PIPESERVER
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StartPipeServerPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StartPipeServerPlan.java
new file mode 100644
index 0000000..a08e5b6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StartPipeServerPlan.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical.sys;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class StartPipeServerPlan extends PhysicalPlan {
+
+  public StartPipeServerPlan() {
+    super(Operator.OperatorType.START_PIPE_SERVER);
+    canBeSplit = false;
+  }
+
+  @Override
+  public List<? extends PartialPath> getPaths() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeByte((byte) PhysicalPlanType.START_PIPE_SERVER.ordinal());
+  }
+
+  @Override
+  public void serializeImpl(ByteBuffer buffer) {
+    buffer.put((byte) PhysicalPlanType.START_PIPE_SERVER.ordinal());
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) throws IllegalPathException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StopPipeServerPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StopPipeServerPlan.java
new file mode 100644
index 0000000..45f9270
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StopPipeServerPlan.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical.sys;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class StopPipeServerPlan extends PhysicalPlan {
+
+  public StopPipeServerPlan() {
+    super(Operator.OperatorType.STOP_PIPE_SERVER);
+    canBeSplit = false;
+  }
+
+  @Override
+  public List<? extends PartialPath> getPaths() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeByte((byte) PhysicalPlanType.START_PIPE_SERVER.ordinal());
+  }
+
+  @Override
+  public void serializeImpl(ByteBuffer buffer) {
+    buffer.put((byte) PhysicalPlanType.START_PIPE_SERVER.ordinal());
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) throws IllegalPathException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 273b774..fc923cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -2282,6 +2282,25 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
     }
   }
 
+  @Override
+  public Operator visitStartPipeServer(IoTDBSqlParser.StartPipeServerContext ctx) {
+    return new StartPipeServerOperator(SQLConstant.TOK_PIPE_SERVER_START);
+  }
+
+  @Override
+  public Operator visitStopPipeServer(IoTDBSqlParser.StopPipeServerContext ctx) {
+    return new StopPipeServerOperator(SQLConstant.TOK_PIPE_SERVER_STOP);
+  }
+
+  @Override
+  public Operator visitShowPipeServer(IoTDBSqlParser.ShowPipeServerContext ctx) {
+    if (ctx.pipeName != null) {
+      return new ShowPipeServerOperator(ctx.pipeName.getText(), SQLConstant.TOK_SHOW_PIPE_SERVER);
+    } else {
+      return new ShowPipeServerOperator(SQLConstant.TOK_SHOW_PIPE_SERVER);
+    }
+  }
+
   /** 7. Common Clauses */
 
   // IoTDB Objects
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 7ae98e3..80c236f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.exception.ConfigurationException;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.newsync.receiver.ReceiverService;
 import org.apache.iotdb.db.newsync.sender.service.SenderService;
 import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
 import org.apache.iotdb.db.protocol.rest.RestService;
@@ -134,6 +135,7 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(TemporaryQueryDataFileService.getInstance());
     registerManager.register(UDFClassLoaderManager.getInstance());
     registerManager.register(UDFRegistrationService.getInstance());
+    registerManager.register(ReceiverService.getInstance());
     registerManager.register(MetricsService.getInstance());
 
     // in cluster mode, RPC service is not enabled.
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index eb82e29..bf8946e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -39,6 +39,7 @@ public enum ServiceType {
   UPGRADE_SERVICE("UPGRADE DataService", ""),
   SETTLE_SERVICE("SETTLE DataService", ""),
   SENDER_SERVICE("Sync Sender service", ""),
+  RECEIVER_SERVICE("Sync Receiver service", ""),
   MERGE_SERVICE("Merge Manager", "Merge Manager"),
   COMPACTION_SERVICE("Compaction Manager", "Compaction Manager"),
   PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE", "PERFORMANCE_STATISTIC_SERVICE"),
diff --git a/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java b/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java
new file mode 100644
index 0000000..081e01c
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.manager;
+
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+public class ReceiverManagerTest {
+  private static final String pipe1 = "pipe1";
+  private static final String pipe2 = "pipe2";
+  private static final String ip1 = "192.168.1.11";
+  private static final String ip2 = "192.168.2.22";
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void test() {
+    try {
+      ReceiverManager manager = ReceiverManager.getInstance();
+      manager.startServer();
+      manager.createPipe(pipe1, ip1, 1);
+      manager.createPipe(pipe2, ip2, 2);
+      manager.createPipe(pipe1, ip2, 3);
+      manager.stopPipe(pipe1, ip1);
+      manager.stopPipe(pipe2, ip2);
+      manager.dropPipe(pipe1, ip2);
+      manager.startPipe(pipe1, ip1);
+      List<PipeInfo> allPipeInfos = manager.getAllPipeInfos();
+      Assert.assertEquals(3, allPipeInfos.size());
+      List<PipeInfo> pipeInfos1 = manager.getPipeInfos(pipe1);
+      List<PipeInfo> pipeInfos2 = manager.getPipeInfos(pipe2);
+      Assert.assertEquals(2, pipeInfos1.size());
+      Assert.assertEquals(1, pipeInfos2.size());
+      for (PipeInfo pipeInfo : pipeInfos2) {
+        Assert.assertEquals(new PipeInfo(pipe2, ip2, PipeStatus.PAUSE, 2), pipeInfo);
+      }
+      for (PipeInfo pipeInfo : pipeInfos1) {
+        if (pipeInfo.getRemoteIp().equals(ip1)) {
+          Assert.assertEquals(new PipeInfo(pipe1, ip1, PipeStatus.RUNNING, 1), pipeInfo);
+        } else {
+          Assert.assertEquals(new PipeInfo(pipe1, ip2, PipeStatus.DROP, 3), pipeInfo);
+        }
+      }
+    } catch (Exception e) {
+      Assert.fail();
+      e.printStackTrace();
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzerTest.java
new file mode 100644
index 0000000..536792a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.recovery;
+
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
+import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+/** This test is for ReceiverLog and ReceiverLogAnalyzer */
+public class ReceiverLogAnalyzerTest {
+
+  private static final String pipe1 = "pipe1";
+  private static final String pipe2 = "pipe2";
+  private static final String ip1 = "192.168.1.11";
+  private static final String ip2 = "192.168.2.22";
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void test() {
+    try {
+      ReceiverLog log = new ReceiverLog();
+      log.startPipeServer();
+      log.createPipe(pipe1, ip1, 1);
+      log.createPipe(pipe2, ip2, 2);
+      log.createPipe(pipe1, ip2, 3);
+      log.stopPipe(pipe1, ip1);
+      log.stopPipeServer();
+      log.startPipeServer();
+      log.stopPipe(pipe2, ip2);
+      log.dropPipe(pipe1, ip2);
+      log.startPipe(pipe1, ip1);
+      ReceiverLogAnalyzer.scan();
+      Map<String, Map<String, PipeInfo>> map = ReceiverLogAnalyzer.getPipeInfoMap();
+      Assert.assertTrue(ReceiverLogAnalyzer.isPipeServerEnable());
+      Assert.assertNotNull(map);
+      Assert.assertEquals(2, map.get(pipe1).size());
+      Assert.assertEquals(1, map.get(pipe2).size());
+      Assert.assertEquals(1, map.get(pipe2).size());
+      Assert.assertEquals(new PipeInfo(pipe2, ip2, PipeStatus.PAUSE, 2), map.get(pipe2).get(ip2));
+      Assert.assertEquals(new PipeInfo(pipe1, ip1, PipeStatus.RUNNING, 1), map.get(pipe1).get(ip1));
+      Assert.assertEquals(new PipeInfo(pipe1, ip2, PipeStatus.DROP, 3), map.get(pipe1).get(ip2));
+      log.close();
+    } catch (Exception e) {
+      Assert.fail();
+      e.printStackTrace();
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
index ea34f3b..af6a033 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.runtime.SQLParserException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -53,9 +54,12 @@ import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowContinuousQueriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowFunctionsPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTriggersPlan;
+import org.apache.iotdb.db.qp.physical.sys.StartPipeServerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.StopPipeServerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
 import org.apache.iotdb.db.query.executor.fill.PreviousFill;
 import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
@@ -1210,6 +1214,36 @@ public class PhysicalPlanTest {
   }
 
   @Test
+  public void testShowPipeServer() throws QueryProcessException {
+    String sql1 = "SHOW PIPESERVER abc";
+    ShowPipeServerPlan plan1 = (ShowPipeServerPlan) processor.parseSQLToPhysicalPlan(sql1);
+    Assert.assertTrue(plan1.isQuery());
+    Assert.assertEquals(ShowPlan.ShowContentType.PIPESERVER, plan1.getShowContentType());
+    Assert.assertNotNull(plan1.getPipeName());
+    String sql2 = "SHOW PIPESERVER";
+    ShowPipeServerPlan plan2 = (ShowPipeServerPlan) processor.parseSQLToPhysicalPlan(sql2);
+    Assert.assertTrue(plan2.isQuery());
+    Assert.assertEquals(ShowPlan.ShowContentType.PIPESERVER, plan2.getShowContentType());
+    Assert.assertNull(plan2.getPipeName());
+  }
+
+  @Test
+  public void testStartPipeServer() throws QueryProcessException {
+    String sql = "START PIPESERVER";
+    StartPipeServerPlan plan = (StartPipeServerPlan) processor.parseSQLToPhysicalPlan(sql);
+    Assert.assertFalse(plan.isQuery());
+    Assert.assertEquals(Operator.OperatorType.START_PIPE_SERVER, plan.getOperatorType());
+  }
+
+  @Test
+  public void testStopPipeServer() throws QueryProcessException {
+    String sql = "STOP PIPESERVER";
+    StopPipeServerPlan plan = (StopPipeServerPlan) processor.parseSQLToPhysicalPlan(sql);
+    Assert.assertFalse(plan.isQuery());
+    Assert.assertEquals(OperatorType.STOP_PIPE_SERVER, plan.getOperatorType());
+  }
+
+  @Test
   public void testCreateCQ1() throws QueryProcessException {
     String sql =
         "CREATE CONTINUOUS QUERY cq1 BEGIN SELECT max_value(temperature) INTO temperature_max FROM root.ln.*.*.* GROUP BY time(10s) END";