You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/11/11 03:39:59 UTC

[iotdb] branch master updated: [IOTDB-4908] Clear deprecated code related to sync in old standalone (#7965)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 600e376bce [IOTDB-4908] Clear deprecated code related to sync in old standalone (#7965)
600e376bce is described below

commit 600e376bce5faf70bf379d44936fd45d33e08fe6
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Fri Nov 11 11:39:52 2022 +0800

    [IOTDB-4908] Clear deprecated code related to sync in old standalone (#7965)
---
 .../db/integration/sync/IoTDBSyncReceiverIT.java   |  59 -----------
 .../sync/IoTDBSyncReceiverLoaderIT.java            |  76 +-------------
 .../db/integration/sync/IoTDBSyncSenderIT.java     |  54 ++--------
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |  15 ---
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |   3 +-
 .../sys/sync/CreatePipeSinkStatement.java          |   7 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  77 ---------------
 .../db/qp/logical/sys/CreatePipeOperator.java      |  12 +--
 .../db/qp/logical/sys/CreatePipeSinkOperator.java  |  11 +--
 .../iotdb/db/qp/logical/sys/DropPipeOperator.java  |   4 +-
 .../db/qp/logical/sys/DropPipeSinkOperator.java    |   4 +-
 .../iotdb/db/qp/logical/sys/ShowPipeOperator.java  |   4 +-
 .../db/qp/logical/sys/ShowPipeSinkOperator.java    |   4 +-
 .../qp/logical/sys/ShowPipeSinkTypeOperator.java   |   4 +-
 .../iotdb/db/qp/logical/sys/StartPipeOperator.java |   4 +-
 .../iotdb/db/qp/logical/sys/StopPipeOperator.java  |   4 +-
 .../iotdb/db/qp/physical/sys/CreatePipePlan.java   | 110 ---------------------
 .../db/qp/physical/sys/CreatePipeSinkPlan.java     |  98 ------------------
 .../iotdb/db/qp/physical/sys/DropPipeSinkPlan.java |  45 ---------
 .../iotdb/db/qp/physical/sys/OperatePipePlan.java  |  45 ---------
 .../iotdb/db/qp/physical/sys/ShowPipePlan.java     |  33 -------
 .../iotdb/db/qp/physical/sys/ShowPipeSinkPlan.java |  33 -------
 .../db/qp/physical/sys/ShowPipeSinkTypePlan.java   |  26 -----
 .../apache/iotdb/db/qp/physical/sys/ShowPlan.java  |   3 -
 .../java/org/apache/iotdb/db/sync/SyncService.java |  77 ---------------
 .../db/sync/common/ClusterSyncInfoFetcher.java     |   6 --
 .../iotdb/db/sync/common/ISyncInfoFetcher.java     |   3 -
 .../apache/iotdb/db/sync/common/LocalSyncInfo.java |  10 --
 .../iotdb/db/sync/common/LocalSyncInfoFetcher.java |  10 --
 .../apache/iotdb/db/sync/pipedata/PipeData.java    |   4 -
 .../iotdb/db/sync/pipedata/SchemaPipeData.java     | 100 -------------------
 .../iotdb/db/sync/receiver/load/SchemaLoader.java  |  71 -------------
 .../iotdb/db/sync/receiver/load/TsFileLoader.java  |  12 ---
 .../apache/iotdb/db/utils/sync/SyncPipeUtil.java   |  37 -------
 .../db/sync/persistence/LocalSyncInfoTest.java     |  16 ++-
 35 files changed, 45 insertions(+), 1036 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
index e07f0e2a86..876dc775ff 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
@@ -22,23 +22,14 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.sync.pipesink.IoTDBPipeSink;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
-import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
 import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
 import org.apache.iotdb.db.sync.transport.client.IoTDBSyncClient;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.itbase.category.LocalStandaloneTest;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
@@ -50,8 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 @Category({LocalStandaloneTest.class})
@@ -121,54 +110,6 @@ public class IoTDBSyncReceiverIT {
 
       // 2. send pipe data
       int serialNum = 0;
-      List<PhysicalPlan> planList = new ArrayList<>();
-      planList.add(new SetStorageGroupPlan(new PartialPath("root.vehicle")));
-      planList.add(
-          new CreateTimeSeriesPlan(
-              new PartialPath("root.vehicle.d0.s0"),
-              new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.RLE)));
-      planList.add(
-          new CreateTimeSeriesPlan(
-              new PartialPath("root.vehicle.d0.s1"),
-              new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN)));
-      planList.add(
-          new CreateTimeSeriesPlan(
-              new PartialPath("root.vehicle.d1.s2"),
-              new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE)));
-      planList.add(
-          new CreateTimeSeriesPlan(
-              new PartialPath("root.vehicle.d1.s3"),
-              new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.PLAIN)));
-      planList.add(new SetStorageGroupPlan(new PartialPath("root.sg1")));
-      planList.add(
-          new CreateAlignedTimeSeriesPlan(
-              new PartialPath("root.sg1.d1"),
-              Arrays.asList("s1", "s2", "s3", "s4", "s5"),
-              Arrays.asList(
-                  TSDataType.FLOAT,
-                  TSDataType.INT32,
-                  TSDataType.INT64,
-                  TSDataType.BOOLEAN,
-                  TSDataType.TEXT),
-              Arrays.asList(
-                  TSEncoding.RLE,
-                  TSEncoding.GORILLA,
-                  TSEncoding.RLE,
-                  TSEncoding.RLE,
-                  TSEncoding.PLAIN),
-              Arrays.asList(
-                  CompressionType.SNAPPY,
-                  CompressionType.SNAPPY,
-                  CompressionType.SNAPPY,
-                  CompressionType.SNAPPY,
-                  CompressionType.SNAPPY),
-              null,
-              null,
-              null));
-      planList.add(new SetStorageGroupPlan(new PartialPath("root.sg1")));
-      for (PhysicalPlan plan : planList) {
-        client.send(new SchemaPipeData(plan, serialNum++));
-      }
       List<File> tsFiles = SyncTestUtil.getTsFilePaths(tmpDir);
       SyncTestUtil.renameTsFiles(tsFiles);
       for (File f : tsFiles) {
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
index 69a4e0763a..b2bf607184 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
@@ -21,20 +21,11 @@ package org.apache.iotdb.db.integration.sync;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.sync.receiver.load.DeletionLoader;
 import org.apache.iotdb.db.sync.receiver.load.ILoader;
-import org.apache.iotdb.db.sync.receiver.load.SchemaLoader;
 import org.apache.iotdb.db.sync.receiver.load.TsFileLoader;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.itbase.category.LocalStandaloneTest;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
@@ -46,8 +37,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 @Category({LocalStandaloneTest.class})
@@ -96,62 +85,7 @@ public class IoTDBSyncReceiverLoaderIT {
     EnvironmentUtils.cleanEnv();
     EnvironmentUtils.envSetUp();
 
-    // 2. test for SchemaLoader
-    List<PhysicalPlan> planList = new ArrayList<>();
-    planList.add(new SetStorageGroupPlan(new PartialPath("root.vehicle")));
-    planList.add(
-        new CreateTimeSeriesPlan(
-            new PartialPath("root.vehicle.d0.s0"),
-            new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.RLE)));
-    planList.add(
-        new CreateTimeSeriesPlan(
-            new PartialPath("root.vehicle.d0.s1"),
-            new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN)));
-    planList.add(
-        new CreateTimeSeriesPlan(
-            new PartialPath("root.vehicle.d1.s2"),
-            new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE)));
-    planList.add(
-        new CreateTimeSeriesPlan(
-            new PartialPath("root.vehicle.d1.s3"),
-            new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.PLAIN)));
-    planList.add(new SetStorageGroupPlan(new PartialPath("root.sg1")));
-    planList.add(
-        new CreateAlignedTimeSeriesPlan(
-            new PartialPath("root.sg1.d1"),
-            Arrays.asList("s1", "s2", "s3", "s4", "s5"),
-            Arrays.asList(
-                TSDataType.FLOAT,
-                TSDataType.INT32,
-                TSDataType.INT64,
-                TSDataType.BOOLEAN,
-                TSDataType.TEXT),
-            Arrays.asList(
-                TSEncoding.RLE,
-                TSEncoding.GORILLA,
-                TSEncoding.RLE,
-                TSEncoding.RLE,
-                TSEncoding.PLAIN),
-            Arrays.asList(
-                CompressionType.SNAPPY,
-                CompressionType.SNAPPY,
-                CompressionType.SNAPPY,
-                CompressionType.SNAPPY,
-                CompressionType.SNAPPY),
-            null,
-            null,
-            null));
-    for (PhysicalPlan plan : planList) {
-      ILoader planLoader = new SchemaLoader(plan);
-      try {
-        planLoader.load();
-      } catch (Exception e) {
-        e.printStackTrace();
-        Assert.fail();
-      }
-    }
-
-    // 3. test for TsFileLoader
+    // 2. test for TsFileLoader
     List<File> tsFiles = SyncTestUtil.getTsFilePaths(tmpDir);
     for (File tsfile : tsFiles) {
       ILoader tsFileLoader = new TsFileLoader(tsfile, "root.vehicle");
@@ -163,7 +97,7 @@ public class IoTDBSyncReceiverLoaderIT {
       }
     }
 
-    // 4. test for DeletionPlanLoader
+    // 3. test for DeletionLoader
     Deletion deletion = new Deletion(new PartialPath("root.vehicle.**"), 0, 33, 38);
     ILoader deletionLoader = new DeletionLoader(deletion);
     try {
@@ -173,8 +107,8 @@ public class IoTDBSyncReceiverLoaderIT {
       Assert.fail();
     }
 
-    // 5. check result after loading
-    // 5.1 check normal timeseries
+    // 4. check result after loading
+    // 4.1 check normal timeseries
     String sql1 = "select * from root.vehicle.*";
     String[] retArray1 =
         new String[] {
@@ -190,7 +124,7 @@ public class IoTDBSyncReceiverLoaderIT {
       "root.vehicle.d0.s0", "root.vehicle.d0.s1", "root.vehicle.d1.s3", "root.vehicle.d1.s2"
     };
     SyncTestUtil.checkResult(sql1, columnNames1, retArray1);
-    // 5.2 check aligned timeseries
+    // 4.2 check aligned timeseries
     String sql2 = "select * from root.sg1.d1";
     String[] retArray2 =
         new String[] {
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
index a0de7ddd86..772b4ed4e9 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
@@ -22,12 +22,10 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.sync.pipesink.IoTDBPipeSink;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeSinkTypePlan;
 import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
 import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
-import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
 import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.wal.recover.WALRecoverManager;
@@ -68,8 +66,6 @@ public class IoTDBSyncSenderIT {
   private final Map<String, List<PipeData>> resultMap = new HashMap<>();
   private static final TsFilePipeData simpleTsFilePipeData =
       new TsFilePipeData("path", "tsfile", 0L);
-  private static final SchemaPipeData simpleSchemaPipeData =
-      new SchemaPipeData(new ShowPipeSinkTypePlan(), 0L);
   private static final DeletionPipeData simpleDeletionPipeData =
       new DeletionPipeData(new Deletion(new PartialPath(), 0L, 0L), 0L);
 
@@ -105,33 +101,6 @@ public class IoTDBSyncSenderIT {
     EnvironmentUtils.cleanEnv();
   }
 
-  private void prepareSchema() throws Exception { // 8 schema plans
-    try (Connection connection =
-            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
-        Statement statement = connection.createStatement()) {
-      statement.execute("set storage group to root.sg1");
-      statement.execute("set storage group to root.sg2");
-      statement.execute("create timeseries root.sg1.d1.s1 with datatype=int32, encoding=PLAIN");
-      statement.execute("create timeseries root.sg1.d1.s2 with datatype=float, encoding=RLE");
-      statement.execute("create timeseries root.sg1.d1.s3 with datatype=TEXT, encoding=PLAIN");
-      statement.execute("create timeseries root.sg1.d2.s4 with datatype=int64, encoding=PLAIN");
-      statement.execute("create timeseries root.sg2.d1.s0 with datatype=int32, encoding=PLAIN");
-      statement.execute("create timeseries root.sg2.d2.s1 with datatype=boolean, encoding=PLAIN");
-    }
-
-    List<PipeData> resultList = new ArrayList<>();
-    for (int i = 0; i < 4; i++) {
-      resultList.add(simpleSchemaPipeData);
-    }
-    resultMap.put("schemaWithDel3InHistory", resultList); // del3 in history
-
-    resultList = new ArrayList<>();
-    for (int i = 0; i < 8; i++) {
-      resultList.add(simpleSchemaPipeData);
-    }
-    resultMap.put("schema", resultList); // del3 do not in history
-  }
-
   private void prepareIns1() throws Exception { // add one seq tsfile in sg1
     try (Connection connection =
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
@@ -238,9 +207,6 @@ public class IoTDBSyncSenderIT {
     for (int i = 0; i < 3; i++) {
       resultList.add(simpleDeletionPipeData);
     }
-    for (int i = 0; i < 2; i++) {
-      resultList.add(simpleSchemaPipeData);
-    }
     resultMap.put("del3", resultList);
   }
 
@@ -288,9 +254,6 @@ public class IoTDBSyncSenderIT {
 
   private void checkInsOnlyResult(List<PipeData> list) { // check ins1, ins2, ins3
     Assert.assertEquals(13, list.size());
-    for (int i = 0; i < 8; i++) {
-      Assert.assertTrue(list.get(i) instanceof SchemaPipeData);
-    }
     for (int i = 9; i < list.size(); i++) {
       Assert.assertTrue(list.get(i) instanceof TsFilePipeData);
     }
@@ -313,7 +276,7 @@ public class IoTDBSyncSenderIT {
   @Test
   public void testHistoryInsert() {
     try {
-      prepareSchema(); // history
+      // history
       prepareIns1();
       prepareIns2();
       prepareIns3();
@@ -340,7 +303,7 @@ public class IoTDBSyncSenderIT {
   @Test
   public void testHistoryAndRealTimeInsert() {
     try {
-      prepareSchema(); // history
+      // history
       prepareIns1();
       prepareIns2();
 
@@ -368,7 +331,7 @@ public class IoTDBSyncSenderIT {
   @Test
   public void testStopAndStartInsert() {
     try {
-      prepareSchema(); // history
+      // history
       prepareIns1();
 
       preparePipeAndSetMock(); // realtime
@@ -399,7 +362,6 @@ public class IoTDBSyncSenderIT {
     try {
       preparePipeAndSetMock(); // realtime
       startPipe();
-      prepareSchema();
       prepareIns1();
       stopPipe();
       prepareIns2();
@@ -426,7 +388,6 @@ public class IoTDBSyncSenderIT {
   @Test
   public void testHistoryDel() {
     try {
-      prepareSchema(); // history
       prepareIns1();
       prepareIns2();
       prepareIns3();
@@ -459,7 +420,6 @@ public class IoTDBSyncSenderIT {
   @Ignore
   public void testRealtimeDel() {
     try {
-      prepareSchema(); // history
       prepareIns1();
 
       preparePipeAndSetMock(); // realtime
@@ -494,7 +454,7 @@ public class IoTDBSyncSenderIT {
   @Test
   public void testRestartWhileRunning() {
     try {
-      prepareSchema(); // history
+      // history
       prepareIns1();
       prepareIns2();
 
@@ -522,7 +482,7 @@ public class IoTDBSyncSenderIT {
   @Test
   public void testRestartWhileStopping() {
     try {
-      prepareSchema(); // history
+      // history
       prepareIns1();
 
       preparePipeAndSetMock(); // realtime
@@ -552,7 +512,7 @@ public class IoTDBSyncSenderIT {
   @Test
   public void testRestartWithDel() {
     try {
-      prepareSchema(); // history
+      // history
       prepareIns1();
       prepareDel1();
 
@@ -589,7 +549,7 @@ public class IoTDBSyncSenderIT {
   @Test
   public void testRestartWithUnsealedTsFile() {
     try {
-      prepareSchema(); // history
+      // history
       prepareIns1();
       prepareIns2();
       prepareDel1();
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 9245336015..aa71c1acb4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -179,24 +179,9 @@ public class IoTDBConstant {
   public static final String COLUMN_TRIGGER_STATUS_STARTED = "started";
   public static final String COLUMN_TRIGGER_STATUS_STOPPED = "stopped";
 
-  // sync module
-  // TODO(sync): delete this in new-standalone version
-  public static final String COLUMN_PIPESERVER_STATUS = "enable";
-  public static final String COLUMN_PIPESINK_NAME = "name";
-  public static final String COLUMN_PIPESINK_TYPE = "type";
-  public static final String COLUMN_PIPESINK_ATTRIBUTES = "attributes";
-  public static final String COLUMN_PIPE_NAME = "name";
-  public static final String COLUMN_PIPE_CREATE_TIME = "create time";
-  public static final String COLUMN_PIPE_ROLE = "role";
-  public static final String COLUMN_PIPE_REMOTE = "remote";
-  public static final String COLUMN_PIPE_STATUS = "status";
-  public static final String COLUMN_PIPE_MSG = "message";
-
   public static final String ONE_LEVEL_PATH_WILDCARD = "*";
   public static final String MULTI_LEVEL_PATH_WILDCARD = "**";
   public static final String TIME = "time";
-  public static final String SYNC_SENDER_ROLE = "sender";
-  public static final String SYNC_RECEIVER_ROLE = "receiver";
 
   // sdt parameters
   public static final String LOSS = "loss";
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 46eb80eadc..1ce8194f91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -3200,8 +3200,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
   @Override
   public Statement visitCreatePipeSink(IoTDBSqlParser.CreatePipeSinkContext ctx) {
 
-    CreatePipeSinkStatement createPipeSinkStatement =
-        new CreatePipeSinkStatement(StatementType.CREATE_PIPESINK);
+    CreatePipeSinkStatement createPipeSinkStatement = new CreatePipeSinkStatement();
 
     if (ctx.pipeSinkName != null) {
       createPipeSinkStatement.setPipeSinkName(parseIdentifier(ctx.pipeSinkName.getText()));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/sync/CreatePipeSinkStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/sync/CreatePipeSinkStatement.java
index 1ce63010f0..2659505985 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/sync/CreatePipeSinkStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/sync/CreatePipeSinkStatement.java
@@ -41,8 +41,8 @@ public class CreatePipeSinkStatement extends Statement implements IConfigStateme
 
   private Map<String, String> attributes;
 
-  public CreatePipeSinkStatement(StatementType createPipeSinkStatement) {
-    this.statementType = createPipeSinkStatement;
+  public CreatePipeSinkStatement() {
+    this.statementType = StatementType.CREATE_PIPESINK;
   }
 
   public String getPipeSinkName() {
@@ -90,8 +90,7 @@ public class CreatePipeSinkStatement extends Statement implements IConfigStateme
       throw new IOException(
           "Parsing CreatePipeSinkStatement error. Attributes is less than expected.");
     }
-    CreatePipeSinkStatement createPipeSinkStatement =
-        new CreatePipeSinkStatement(StatementType.CREATE_PIPESINK);
+    CreatePipeSinkStatement createPipeSinkStatement = new CreatePipeSinkStatement();
     createPipeSinkStatement.setPipeSinkName(split[0]);
     createPipeSinkStatement.setPipeSinkType(split[1]);
     int size = (Integer.parseInt(split[2]) << 1);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index fa78332e35..7488f0d5f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.commons.udf.UDFInformation;
 import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction;
 import org.apache.iotdb.commons.udf.service.UDFManagementService;
@@ -67,8 +66,6 @@ import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowNodesInTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPathsSetTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPathsUsingTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeSinkPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
@@ -85,7 +82,6 @@ import org.apache.iotdb.db.query.dataset.SingleDataSet;
 import org.apache.iotdb.db.query.executor.IQueryRouter;
 import org.apache.iotdb.db.query.executor.QueryRouter;
 import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -106,7 +102,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
@@ -126,15 +121,6 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_FUNCTION_CLASS;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_FUNCTION_NAME;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_FUNCTION_TYPE;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_ITEM;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPESINK_ATTRIBUTES;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPESINK_NAME;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPESINK_TYPE;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPE_CREATE_TIME;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPE_MSG;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPE_NAME;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPE_REMOTE;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPE_ROLE;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPE_STATUS;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PRIVILEGE;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_ROLE;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_SCHEMA_TEMPLATE;
@@ -295,12 +281,6 @@ public class PlanExecutor implements IPlanExecutor {
         return processShowPathsSetSchemaTemplate((ShowPathsSetTemplatePlan) showPlan);
       case PATHS_USING_SCHEMA_TEMPLATE:
         return processShowPathsUsingSchemaTemplate((ShowPathsUsingTemplatePlan) showPlan);
-      case PIPESINK:
-        return processShowPipeSink((ShowPipeSinkPlan) showPlan);
-      case PIPESINKTYPE:
-        return processShowPipeSinkType();
-      case PIPE:
-        return processShowPipes((ShowPipePlan) showPlan);
       default:
         throw new QueryProcessException(String.format("Unrecognized show plan %s", showPlan));
     }
@@ -767,63 +747,6 @@ public class PlanExecutor implements IPlanExecutor {
     listDataSet.putRecord(rowRecord);
   }
 
-  private QueryDataSet processShowPipeSink(ShowPipeSinkPlan plan) {
-    ListDataSet listDataSet =
-        new ListDataSet(
-            Arrays.asList(
-                new PartialPath(COLUMN_PIPESINK_NAME, false),
-                new PartialPath(COLUMN_PIPESINK_TYPE, false),
-                new PartialPath(COLUMN_PIPESINK_ATTRIBUTES, false)),
-            Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
-    boolean showAll = "".equals(plan.getPipeSinkName());
-    for (PipeSink pipeSink : SyncService.getInstance().getAllPipeSink()) {
-      if (showAll || plan.getPipeSinkName().equals(pipeSink.getPipeSinkName())) {
-        RowRecord record = new RowRecord(0);
-        record.addField(Binary.valueOf(pipeSink.getPipeSinkName()), TSDataType.TEXT);
-        record.addField(Binary.valueOf(pipeSink.getType().name()), TSDataType.TEXT);
-        record.addField(Binary.valueOf(pipeSink.showAllAttributes()), TSDataType.TEXT);
-        listDataSet.putRecord(record);
-      }
-    }
-    return listDataSet;
-  }
-
-  private QueryDataSet processShowPipeSinkType() {
-    ListDataSet listDataSet =
-        new ListDataSet(
-            Arrays.asList(new PartialPath(COLUMN_PIPESINK_TYPE, false)),
-            Arrays.asList(TSDataType.TEXT));
-    for (PipeSink.PipeSinkType pipeSinkType : PipeSink.PipeSinkType.values()) {
-      RowRecord record = new RowRecord(0);
-      record.addField(Binary.valueOf(pipeSinkType.name()), TSDataType.TEXT);
-      listDataSet.putRecord(record);
-    }
-    return listDataSet;
-  }
-
-  private QueryDataSet processShowPipes(ShowPipePlan plan) {
-    ListDataSet listDataSet =
-        new ListDataSet(
-            Arrays.asList(
-                new PartialPath(COLUMN_PIPE_CREATE_TIME, false),
-                new PartialPath(COLUMN_PIPE_NAME, false),
-                new PartialPath(COLUMN_PIPE_ROLE, false),
-                new PartialPath(COLUMN_PIPE_REMOTE, false),
-                new PartialPath(COLUMN_PIPE_STATUS, false),
-                new PartialPath(COLUMN_PIPE_MSG, false)),
-            Arrays.asList(
-                TSDataType.TEXT,
-                TSDataType.TEXT,
-                TSDataType.TEXT,
-                TSDataType.TEXT,
-                TSDataType.TEXT,
-                TSDataType.TEXT));
-    SyncService.getInstance().showPipe(plan, listDataSet);
-    // sort by create time
-    listDataSet.sort(Comparator.comparing(o -> o.getFields().get(0).getStringValue()));
-    return listDataSet;
-  }
-
   // high Cognitive Complexity
 
   protected QueryDataSet processAuthorQuery(AuthorPlan plan) throws QueryProcessException {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeOperator.java
index 44dc6ee3d7..911f8e5057 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeOperator.java
@@ -23,13 +23,12 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
+// TODO: remove this
 public class CreatePipeOperator extends Operator {
   private String pipeName;
   private String pipeSinkName;
@@ -57,13 +56,6 @@ public class CreatePipeOperator extends Operator {
   @Override
   public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
       throws QueryProcessException {
-    CreatePipePlan plan = new CreatePipePlan(pipeName, pipeSinkName);
-    plan.setDataStartTimestamp(startTime);
-    Iterator<Map.Entry<String, String>> iterator = pipeAttributes.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Map.Entry<String, String> entry = iterator.next();
-      plan.addPipeAttribute(entry.getKey(), entry.getValue());
-    }
-    return plan;
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeSinkOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeSinkOperator.java
index d0de62f325..bdf90daabf 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeSinkOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeSinkOperator.java
@@ -23,13 +23,12 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
+// TODO: remove this
 public class CreatePipeSinkOperator extends Operator {
   private String pipeSinkName;
   private String pipeSinkType;
@@ -50,12 +49,6 @@ public class CreatePipeSinkOperator extends Operator {
   @Override
   public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
       throws QueryProcessException {
-    CreatePipeSinkPlan plan = new CreatePipeSinkPlan(pipeSinkName, pipeSinkType);
-    Iterator<Map.Entry<String, String>> iterator = pipeSinkAttributes.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Map.Entry<String, String> entry = iterator.next();
-      plan.addPipeSinkAttribute(entry.getKey(), entry.getValue());
-    }
-    return plan;
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeOperator.java
index ea98dc017a..7cc8e5b4e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeOperator.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.OperatePipePlan;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 
+// TODO: remove this
 public class DropPipeOperator extends Operator {
   private String pipeName;
 
@@ -39,6 +39,6 @@ public class DropPipeOperator extends Operator {
   @Override
   public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
       throws QueryProcessException {
-    return new OperatePipePlan(pipeName, operatorType);
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeSinkOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeSinkOperator.java
index 9341e06b56..6ea8e0bed1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeSinkOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeSinkOperator.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.DropPipeSinkPlan;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 
+// TODO: remove this
 public class DropPipeSinkOperator extends Operator {
   private String pipeSinkName;
 
@@ -38,6 +38,6 @@ public class DropPipeSinkOperator extends Operator {
   @Override
   public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
       throws QueryProcessException {
-    return new DropPipeSinkPlan(pipeSinkName);
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeOperator.java
index b6070eea7a..5f77a1b3aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeOperator.java
@@ -22,9 +22,9 @@ package org.apache.iotdb.db.qp.logical.sys;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 
+// TODO: remove this
 public class ShowPipeOperator extends ShowOperator {
   private String pipeName;
 
@@ -40,6 +40,6 @@ public class ShowPipeOperator extends ShowOperator {
   @Override
   public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
       throws QueryProcessException {
-    return new ShowPipePlan(pipeName);
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkOperator.java
index 7a528c1c5a..4554e38754 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkOperator.java
@@ -22,9 +22,9 @@ package org.apache.iotdb.db.qp.logical.sys;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeSinkPlan;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 
+// TODO: remove this
 public class ShowPipeSinkOperator extends ShowOperator {
   private String pipeSinkName;
 
@@ -40,6 +40,6 @@ public class ShowPipeSinkOperator extends ShowOperator {
   @Override
   public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
       throws QueryProcessException {
-    return new ShowPipeSinkPlan(pipeSinkName);
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkTypeOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkTypeOperator.java
index 0ff83f1d52..429a96dece 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkTypeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkTypeOperator.java
@@ -22,9 +22,9 @@ package org.apache.iotdb.db.qp.logical.sys;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeSinkTypePlan;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 
+// TODO: remove this
 public class ShowPipeSinkTypeOperator extends ShowOperator {
   public ShowPipeSinkTypeOperator() {
     super(SQLConstant.TOK_SHOW_PIPESINKTYPE, OperatorType.SHOW_PIPESINKTYPE);
@@ -33,6 +33,6 @@ public class ShowPipeSinkTypeOperator extends ShowOperator {
   @Override
   public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
       throws QueryProcessException {
-    return new ShowPipeSinkTypePlan();
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeOperator.java
index d471122d41..d4c0ceca63 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeOperator.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.OperatePipePlan;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 
+// TODO: remove this
 public class StartPipeOperator extends Operator {
   private String pipeName;
 
@@ -39,6 +39,6 @@ public class StartPipeOperator extends Operator {
   @Override
   public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
       throws QueryProcessException {
-    return new OperatePipePlan(pipeName, operatorType);
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeOperator.java
index c76500d46c..0a2a33a645 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeOperator.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.OperatePipePlan;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 
+// TODO: remove this
 public class StopPipeOperator extends Operator {
   private String pipeName;
 
@@ -39,6 +39,6 @@ public class StopPipeOperator extends Operator {
   @Override
   public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
       throws QueryProcessException {
-    return new OperatePipePlan(pipeName, operatorType);
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipePlan.java
deleted file mode 100644
index aac5972d96..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipePlan.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.sync.utils.SyncConstant;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.tsfile.utils.Pair;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class CreatePipePlan extends PhysicalPlan {
-  private String pipeName;
-  private String pipeSinkName;
-  private long dataStartTimestamp;
-  private List<Pair<String, String>> pipeAttributes;
-
-  public CreatePipePlan(String pipeName, String pipeSinkName) {
-    super(Operator.OperatorType.CREATE_PIPE);
-    this.pipeName = pipeName;
-    this.pipeSinkName = pipeSinkName;
-    dataStartTimestamp = 0;
-    pipeAttributes = new ArrayList<>();
-  }
-
-  public void setDataStartTimestamp(long dataStartTimestamp) {
-    this.dataStartTimestamp = dataStartTimestamp;
-  }
-
-  public void addPipeAttribute(String attr, String value) {
-    pipeAttributes.add(new Pair<>(attr, value));
-  }
-
-  public String getPipeName() {
-    return pipeName;
-  }
-
-  public String getPipeSinkName() {
-    return pipeSinkName;
-  }
-
-  public long getDataStartTimestamp() {
-    return dataStartTimestamp;
-  }
-
-  public List<Pair<String, String>> getPipeAttributes() {
-    return pipeAttributes;
-  }
-
-  @Override
-  public List<? extends PartialPath> getPaths() {
-    return Collections.emptyList();
-  }
-
-  public static CreatePipePlan parseString(String parsedString) throws IOException {
-    String[] attributes = parsedString.split(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
-    if (attributes.length < 4) {
-      throw new IOException("Parsing CreatePipePlan error. Attributes is less than expected.");
-    }
-    CreatePipePlan plan = new CreatePipePlan(attributes[0], attributes[1]);
-    plan.setDataStartTimestamp(Long.parseLong(attributes[2]));
-    int size = (Integer.parseInt(attributes[3]) << 1);
-    if (attributes.length != (size + 4)) {
-      throw new IOException("Parsing CreatePipePlan error. Attributes number is wrong.");
-    }
-    for (int i = 0; i < size; i += 2) {
-      plan.addPipeAttribute(attributes[i + 4], attributes[i + 5]);
-    }
-    return plan;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder();
-    builder.append(pipeName).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
-    builder.append(pipeSinkName).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
-    builder.append(dataStartTimestamp).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
-    builder.append(pipeAttributes.size()).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
-    for (int i = 0; i < pipeAttributes.size(); i++) {
-      builder
-          .append(pipeAttributes.get(i).left)
-          .append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
-      builder
-          .append(pipeAttributes.get(i).right)
-          .append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
-    }
-    return builder.toString();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipeSinkPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipeSinkPlan.java
deleted file mode 100644
index 61b0b36e65..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipeSinkPlan.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.sync.utils.SyncConstant;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.tsfile.utils.Pair;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class CreatePipeSinkPlan extends PhysicalPlan {
-  private String pipeSinkName;
-  private String pipeSinkType;
-  private List<Pair<String, String>> pipeSinkAttributes;
-
-  public CreatePipeSinkPlan(String pipeSinkName, String pipeSinkType) {
-    super(Operator.OperatorType.CREATE_PIPESINK);
-    this.pipeSinkName = pipeSinkName;
-    this.pipeSinkType = pipeSinkType;
-    pipeSinkAttributes = new ArrayList<>();
-  }
-
-  public void addPipeSinkAttribute(String attr, String value) {
-    pipeSinkAttributes.add(new Pair<>(attr, value));
-  }
-
-  public String getPipeSinkName() {
-    return pipeSinkName;
-  }
-
-  public String getPipeSinkType() {
-    return pipeSinkType;
-  }
-
-  public List<Pair<String, String>> getPipeSinkAttributes() {
-    return pipeSinkAttributes;
-  }
-
-  @Override
-  public List<? extends PartialPath> getPaths() {
-    return Collections.emptyList();
-  }
-
-  public static CreatePipeSinkPlan parseString(String parsedString) throws IOException {
-    String[] attributes = parsedString.split(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
-    if (attributes.length < 3) {
-      throw new IOException("Parsing CreatePipeSinkPlan error. Attributes is less than expected.");
-    }
-    CreatePipeSinkPlan plan = new CreatePipeSinkPlan(attributes[0], attributes[1]);
-    int size = (Integer.parseInt(attributes[2]) << 1);
-    if (attributes.length != (size + 3)) {
-      throw new IOException("Parsing CreatePipeSinkPlan error. Attributes number is wrong.");
-    }
-    for (int i = 0; i < size; i += 2) {
-      plan.addPipeSinkAttribute(attributes[i + 3], attributes[i + 4]);
-    }
-    return plan;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder();
-    builder.append(pipeSinkName).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
-    builder.append(pipeSinkType).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
-    builder.append(pipeSinkAttributes.size()).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
-    for (int i = 0; i < pipeSinkAttributes.size(); i++) {
-      builder
-          .append(pipeSinkAttributes.get(i).left)
-          .append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
-      builder
-          .append(pipeSinkAttributes.get(i).right)
-          .append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
-    }
-    return builder.toString();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropPipeSinkPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropPipeSinkPlan.java
deleted file mode 100644
index 4da8de3c9e..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropPipeSinkPlan.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
-import java.util.Collections;
-import java.util.List;
-
-public class DropPipeSinkPlan extends PhysicalPlan {
-  private String pipeSinkName;
-
-  public DropPipeSinkPlan(String pipeSinkName) {
-    super(Operator.OperatorType.DROP_PIPESINK);
-    this.pipeSinkName = pipeSinkName;
-  }
-
-  public String getPipeSinkName() {
-    return pipeSinkName;
-  }
-
-  @Override
-  public List<? extends PartialPath> getPaths() {
-    return Collections.emptyList();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/OperatePipePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/OperatePipePlan.java
deleted file mode 100644
index 6af563c5c3..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/OperatePipePlan.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
-import java.util.Collections;
-import java.util.List;
-
-public class OperatePipePlan extends PhysicalPlan {
-  private String pipeName;
-
-  public OperatePipePlan(String pipeName, Operator.OperatorType type) {
-    super(type);
-    this.pipeName = pipeName;
-  }
-
-  public String getPipeName() {
-    return pipeName;
-  }
-
-  @Override
-  public List<? extends PartialPath> getPaths() {
-    return Collections.emptyList();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipePlan.java
deleted file mode 100644
index bbdbdf2043..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipePlan.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-public class ShowPipePlan extends ShowPlan {
-  private String pipeName;
-
-  public ShowPipePlan(String pipeName) {
-    super(ShowContentType.PIPE);
-    this.pipeName = pipeName;
-  }
-
-  public String getPipeName() {
-    return pipeName;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeSinkPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeSinkPlan.java
deleted file mode 100644
index 038a681349..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeSinkPlan.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-public class ShowPipeSinkPlan extends ShowPlan {
-  private String pipeSinkName;
-
-  public ShowPipeSinkPlan(String pipeSinkName) {
-    super(ShowContentType.PIPESINK);
-    this.pipeSinkName = pipeSinkName;
-  }
-
-  public String getPipeSinkName() {
-    return pipeSinkName;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeSinkTypePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeSinkTypePlan.java
deleted file mode 100644
index fe29622c66..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeSinkTypePlan.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-public class ShowPipeSinkTypePlan extends ShowPlan {
-  public ShowPipeSinkTypePlan() {
-    super(ShowContentType.PIPESINKTYPE);
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
index 0398a2f014..86f3e95112 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
@@ -127,8 +127,5 @@ public class ShowPlan extends PhysicalPlan {
     NODES_IN_SCHEMA_TEMPLATE,
     PATHS_SET_SCHEMA_TEMPLATE,
     PATHS_USING_SCHEMA_TEMPLATE,
-    PIPESINK,
-    PIPESINKTYPE,
-    PIPE,
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index 7cce9048df..1051723306 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.sync;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.ShutdownException;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.exception.sync.PipeException;
@@ -39,16 +38,12 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
 import org.apache.iotdb.db.qp.utils.DateTimeUtils;
-import org.apache.iotdb.db.query.dataset.ListDataSet;
 import org.apache.iotdb.db.sync.common.ClusterSyncInfoFetcher;
 import org.apache.iotdb.db.sync.common.ISyncInfoFetcher;
 import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
 import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginManager;
 import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginRegister;
-import org.apache.iotdb.db.sync.externalpipe.ExternalPipeStatus;
 import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
 import org.apache.iotdb.db.sync.sender.pipe.ExternalPipeSink;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
@@ -60,9 +55,6 @@ import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriterFactory;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
 import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.utils.Binary;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.thrift.TException;
@@ -140,14 +132,6 @@ public class SyncService implements IService {
     return syncInfoFetcher.getPipeSink(name);
   }
 
-  // TODO(sync): delete this in new-standalone version
-  public void addPipeSink(CreatePipeSinkPlan plan) throws PipeSinkException {
-    TSStatus status = syncInfoFetcher.addPipeSink(plan);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeSinkException(status.message);
-    }
-  }
-
   public void addPipeSink(CreatePipeSinkStatement createPipeSinkStatement)
       throws PipeSinkException {
     logger.info("Add PIPESINK {}", createPipeSinkStatement);
@@ -425,67 +409,6 @@ public class SyncService implements IService {
     return list;
   }
 
-  // TODO(sync): delete this in new-standalone version
-  public void showPipe(ShowPipePlan plan, ListDataSet listDataSet) {
-    boolean showAll = "".equals(plan.getPipeName());
-    // show pipe in sender
-    for (PipeInfo pipe : SyncService.getInstance().getAllPipeInfos()) {
-      if (showAll || plan.getPipeName().equals(pipe.getPipeName())) {
-        try {
-          RowRecord record = new RowRecord(0);
-          record.addField(
-              Binary.valueOf(DateTimeUtils.convertLongToDate(pipe.getCreateTime())),
-              TSDataType.TEXT);
-          record.addField(Binary.valueOf(pipe.getPipeName()), TSDataType.TEXT);
-          record.addField(Binary.valueOf(IoTDBConstant.SYNC_SENDER_ROLE), TSDataType.TEXT);
-          record.addField(Binary.valueOf(pipe.getPipeSinkName()), TSDataType.TEXT);
-          record.addField(Binary.valueOf(pipe.getStatus().name()), TSDataType.TEXT);
-          PipeSink pipeSink = syncInfoFetcher.getPipeSink(pipe.getPipeSinkName());
-          if (pipeSink.getType() == PipeSink.PipeSinkType.ExternalPipe) { // for external pipe
-            ExtPipePluginManager extPipePluginManager =
-                SyncService.getInstance().getExternalPipeManager(pipe.getPipeName());
-
-            if (extPipePluginManager != null) {
-              String extPipeType = ((ExternalPipeSink) pipeSink).getExtPipeSinkTypeName();
-              ExternalPipeStatus externalPipeStatus =
-                  extPipePluginManager.getExternalPipeStatus(extPipeType);
-
-              // TODO(ext-pipe): Adapting to the new syntax of SHOW PIPE
-              if (externalPipeStatus != null) {
-                record.addField(
-                    Binary.valueOf(
-                        externalPipeStatus.getWriterInvocationFailures().toString()
-                            + ";"
-                            + externalPipeStatus.getWriterStatuses().toString()),
-                    TSDataType.TEXT);
-              }
-            }
-          } else {
-            record.addField(Binary.valueOf(pipe.getMessageType().name()), TSDataType.TEXT);
-          }
-          listDataSet.putRecord(record);
-        } catch (Exception e) {
-          logger.error("failed to show pipe [{}] because {}", pipe.getPipeName(), e.getMessage());
-        }
-      }
-    }
-    // show pipe in receiver
-    List<TSyncIdentityInfo> identityInfoList = receiverManager.getAllTSyncIdentityInfos();
-    for (TSyncIdentityInfo identityInfo : identityInfoList) {
-      // TODO(sync): Removing duplicate rows
-      RowRecord record = new RowRecord(0);
-      record.addField(
-          Binary.valueOf(DateTimeUtils.convertLongToDate(identityInfo.getCreateTime())),
-          TSDataType.TEXT);
-      record.addField(Binary.valueOf(identityInfo.getPipeName()), TSDataType.TEXT);
-      record.addField(Binary.valueOf(IoTDBConstant.SYNC_RECEIVER_ROLE), TSDataType.TEXT);
-      record.addField(Binary.valueOf(identityInfo.getAddress()), TSDataType.TEXT);
-      record.addField(Binary.valueOf(PipeStatus.RUNNING.name()), TSDataType.TEXT);
-      record.addField(Binary.valueOf(PipeMessage.PipeMessageType.NORMAL.name()), TSDataType.TEXT);
-      listDataSet.putRecord(record);
-    }
-  }
-
   // endregion
 
   // region Interfaces and Implementation of External-Pipe
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
index 8021d9815c..33254a4547 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
 import org.apache.iotdb.rpc.RpcUtils;
 
@@ -55,11 +54,6 @@ public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
 
   // region Interfaces of PipeSink
 
-  @Override
-  public TSStatus addPipeSink(CreatePipeSinkPlan plan) {
-    return RpcUtils.SUCCESS_STATUS;
-  }
-
   @Override
   public TSStatus addPipeSink(CreatePipeSinkStatement createPipeSinkStatement) {
     return RpcUtils.SUCCESS_STATUS;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
index 30839df25a..89f04c4e0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
@@ -24,15 +24,12 @@ import org.apache.iotdb.commons.sync.pipe.PipeInfo;
 import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 
 import java.util.List;
 
 public interface ISyncInfoFetcher {
 
   // region Interfaces of PipeSink
-  // TODO(sync): delete this in new-standalone version
-  TSStatus addPipeSink(CreatePipeSinkPlan plan);
 
   TSStatus addPipeSink(CreatePipeSinkStatement createPipeSinkStatement);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
index fd331d572d..c30d5cc852 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.commons.sync.pipe.SyncOperation;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
 
 import org.slf4j.Logger;
@@ -68,15 +67,6 @@ public class LocalSyncInfo {
 
   // region Implement of PipeSink
 
-  // TODO: delete this in new-standalone version
-  public void addPipeSink(CreatePipeSinkPlan plan) throws PipeSinkException, IOException {
-    syncMetadata.checkAddPipeSink(plan.getPipeSinkName());
-    PipeSink pipeSink = SyncPipeUtil.parseCreatePipeSinkPlan(plan);
-    // should guarantee the adding pipesink is not exist.
-    syncMetadata.addPipeSink(pipeSink);
-    syncLogWriter.addPipeSink(pipeSink);
-  }
-
   public void addPipeSink(CreatePipeSinkStatement createPipeSinkStatement)
       throws PipeSinkException, IOException {
     syncMetadata.checkAddPipeSink(createPipeSinkStatement.getPipeSinkName());
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
index ce50511013..923828ceda 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.commons.sync.pipe.SyncOperation;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -47,15 +46,6 @@ public class LocalSyncInfoFetcher implements ISyncInfoFetcher {
   }
 
   // region Implement of PipeSink
-  @Override
-  public TSStatus addPipeSink(CreatePipeSinkPlan plan) {
-    try {
-      localSyncInfo.addPipeSink(plan);
-      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-    } catch (PipeSinkException | IOException e) {
-      return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
-    }
-  }
 
   @Override
   public TSStatus addPipeSink(CreatePipeSinkStatement createPipeSinkStatement) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java
index 7d9adf1249..18d41d2f35 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java
@@ -82,9 +82,6 @@ public abstract class PipeData {
       case DELETION:
         pipeData = new DeletionPipeData();
         break;
-      case SCHEMA:
-        pipeData = new SchemaPipeData();
-        break;
       default:
         logger.error("Deserialize PipeData error because Unknown type {}.", type);
         throw new UnsupportedOperationException(
@@ -103,6 +100,5 @@ public abstract class PipeData {
   public enum PipeDataType {
     TSFILE,
     DELETION,
-    SCHEMA
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/SchemaPipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/SchemaPipeData.java
deleted file mode 100644
index ee1d342f24..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/SchemaPipeData.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.sync.pipedata;
-
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.sync.receiver.load.ILoader;
-import org.apache.iotdb.db.sync.receiver.load.SchemaLoader;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Objects;
-
-public class SchemaPipeData extends PipeData {
-  private static final int SERIALIZE_BUFFER_SIZE = 1024;
-
-  private PhysicalPlan plan;
-
-  public SchemaPipeData() {
-    super();
-  }
-
-  public SchemaPipeData(PhysicalPlan plan, long serialNumber) {
-    super(serialNumber);
-    this.plan = plan;
-  }
-
-  @Override
-  public PipeDataType getType() {
-    return PipeDataType.SCHEMA;
-  }
-
-  @Override
-  public long serialize(DataOutputStream stream) throws IOException {
-    long serializeSize = super.serialize(stream);
-    byte[] bytes = getBytes();
-    stream.writeInt(bytes.length);
-    stream.write(bytes);
-    serializeSize += (Integer.BYTES + bytes.length);
-    return serializeSize;
-  }
-
-  private byte[] getBytes() {
-    ByteBuffer buffer = ByteBuffer.allocate(SERIALIZE_BUFFER_SIZE);
-    plan.serialize(buffer);
-    byte[] bytes = new byte[buffer.position()];
-    buffer.flip();
-    buffer.get(bytes);
-    return bytes;
-  }
-
-  public void deserialize(DataInputStream stream) throws IOException, IllegalPathException {
-    super.deserialize(stream);
-    byte[] bytes = new byte[stream.readInt()];
-    stream.read(bytes);
-    plan = PhysicalPlan.Factory.create(ByteBuffer.wrap(bytes));
-  }
-
-  @Override
-  public ILoader createLoader() {
-    return new SchemaLoader(plan);
-  }
-
-  @Override
-  public String toString() {
-    return "SchemaPipeData{" + "serialNumber=" + serialNumber + ", plan=" + plan + '}';
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    SchemaPipeData that = (SchemaPipeData) o;
-    return Objects.equals(plan, that.plan) && Objects.equals(serialNumber, that.serialNumber);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(plan, serialNumber);
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/SchemaLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/SchemaLoader.java
deleted file mode 100644
index 34cb369780..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/SchemaLoader.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.receiver.load;
-
-import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This loader is used to PhysicalPlan. Support four types of physical plans: CREATE_TIMESERIES |
- * CREATE_ALIGNED_TIMESERIES | DELETE_TIMESERIES | SET_STORAGE_GROUP
- */
-public class SchemaLoader implements ILoader {
-  private static final Logger logger = LoggerFactory.getLogger(SchemaLoader.class);
-  private static PlanExecutor planExecutor;
-
-  static {
-    try {
-      planExecutor = new PlanExecutor();
-    } catch (QueryProcessException e) {
-      logger.error(e.getMessage());
-    }
-  }
-
-  private final PhysicalPlan plan;
-
-  public SchemaLoader(PhysicalPlan plan) {
-    this.plan = plan;
-  }
-
-  @Override
-  public void load() throws PipeDataLoadException {
-    try {
-      planExecutor.processNonQuery(plan);
-    } catch (QueryProcessException e) {
-      if (e.getCause() instanceof StorageGroupAlreadySetException) {
-        logger.warn(
-            "Sync receiver try to set storage group "
-                + ((StorageGroupAlreadySetException) e.getCause()).getStorageGroupPath()
-                + " that has already been set");
-      } else {
-        throw new PipeDataLoadException(e.getMessage());
-      }
-    } catch (StorageEngineException | StorageGroupNotSetException e) {
-      throw new PipeDataLoadException(e.getMessage());
-    }
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
index f377bb1f5a..37d7b994f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
@@ -23,11 +23,9 @@ import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.LoadFileException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -39,18 +37,8 @@ import java.io.File;
 /** This loader is used to load tsFiles. If .mods file exists, it will be loaded as well. */
 public class TsFileLoader implements ILoader {
   private static final Logger logger = LoggerFactory.getLogger(TsFileLoader.class);
-  private static PlanExecutor planExecutor;
-
-  static {
-    try {
-      planExecutor = new PlanExecutor();
-    } catch (QueryProcessException e) {
-      logger.error(e.getMessage());
-    }
-  }
 
   private final File tsFile;
-  // TODO(sync): use storage group to support auto create schema
   private final String storageGroup;
 
   public TsFileLoader(File tsFile, String storageGroup) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java b/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
index 73669ebbcd..d87e07bab5 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
@@ -28,30 +28,14 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.db.sync.sender.pipe.PipeSinkFactory;
 import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.Map;
 
 public class SyncPipeUtil {
 
-  // TODO(sync): delete this in new-standalone version
-  public static PipeSink parseCreatePipeSinkPlan(CreatePipeSinkPlan plan) throws PipeSinkException {
-    PipeSink pipeSink;
-    try {
-      pipeSink = PipeSinkFactory.createPipeSink(plan.getPipeSinkType(), plan.getPipeSinkName());
-    } catch (UnsupportedOperationException e) {
-      throw new PipeSinkException(e.getMessage());
-    }
-
-    pipeSink.setAttribute(plan.getPipeSinkAttributes());
-    return pipeSink;
-  }
-
   public static PipeSink parseCreatePipeSinkStatement(
       CreatePipeSinkStatement createPipeSinkStatement) throws PipeSinkException {
     PipeSink pipeSink;
@@ -67,27 +51,6 @@ public class SyncPipeUtil {
     return pipeSink;
   }
 
-  // TODO(sync): delete this in new-standalone version
-  public static PipeInfo parseCreatePipePlanAsPipeInfo(CreatePipePlan plan, long pipeCreateTime)
-      throws PipeException {
-    boolean syncDelOp = false;
-    for (Pair<String, String> pair : plan.getPipeAttributes()) {
-      pair.left = pair.left.toLowerCase();
-      if ("syncdelop".equals(pair.left)) {
-        syncDelOp = Boolean.parseBoolean(pair.right);
-      } else {
-        throw new PipeException(String.format("Can not recognition attribute %s", pair.left));
-      }
-    }
-
-    return new TsFilePipeInfo(
-        plan.getPipeName(),
-        plan.getPipeSinkName(),
-        pipeCreateTime,
-        plan.getDataStartTimestamp(),
-        syncDelOp);
-  }
-
   public static PipeInfo parseCreatePipeStatementAsPipeInfo(
       CreatePipeStatement createPipeStatement, long pipeCreateTime) throws PipeException {
     boolean syncDelOp = false;
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/persistence/LocalSyncInfoTest.java b/server/src/test/java/org/apache/iotdb/db/sync/persistence/LocalSyncInfoTest.java
index 032db8479b..92350d049e 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/persistence/LocalSyncInfoTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/persistence/LocalSyncInfoTest.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.sync.pipe.SyncOperation;
 import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.sync.common.LocalSyncInfo;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 
@@ -35,6 +35,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 public class LocalSyncInfoTest {
   private static final String pipe1 = "pipe1";
@@ -56,9 +58,13 @@ public class LocalSyncInfoTest {
   public void testOperatePipe() throws Exception {
     LocalSyncInfo localSyncInfo = new LocalSyncInfo();
     try {
-      CreatePipeSinkPlan createPipeSinkPlan = new CreatePipeSinkPlan("demo", "iotdb");
-      createPipeSinkPlan.addPipeSinkAttribute("ip", "127.0.0.1");
-      createPipeSinkPlan.addPipeSinkAttribute("port", "6670");
+      CreatePipeSinkStatement createPipeSinkStatement = new CreatePipeSinkStatement();
+      createPipeSinkStatement.setPipeSinkName("demo");
+      createPipeSinkStatement.setPipeSinkType("IoTDB");
+      Map<String, String> attributes = new HashMap<>();
+      attributes.put("ip", "127.0.0.1");
+      attributes.put("port", "6667");
+      createPipeSinkStatement.setAttributes(attributes);
       try {
         localSyncInfo.addPipe(new TsFilePipeInfo(pipe1, "demo", createdTime1, 0, true));
         Assert.fail();
@@ -66,7 +72,7 @@ public class LocalSyncInfoTest {
         Assert.assertTrue(e instanceof PipeSinkNotExistException);
         // throw exception because can not find pipeSink
       }
-      localSyncInfo.addPipeSink(createPipeSinkPlan);
+      localSyncInfo.addPipeSink(createPipeSinkStatement);
       localSyncInfo.addPipe(new TsFilePipeInfo(pipe1, "demo", createdTime1, 0, true));
       localSyncInfo.addPipe(new TsFilePipeInfo(pipe2, "demo", createdTime2, 0, true));
       try {