You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/01/26 01:54:02 UTC

[iotdb] branch new_sync updated: [To new_sync][IOTDB-1907] implement customized sync process: sender 2 (#4974)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch new_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/new_sync by this push:
     new 442e6fa  [To new_sync][IOTDB-1907] implement customized sync process: sender 2 (#4974)
442e6fa is described below

commit 442e6fafbb5d6a752719b8e76975c338d5f3e47a
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Wed Jan 26 09:53:21 2022 +0800

    [To new_sync][IOTDB-1907] implement customized sync process: sender 2 (#4974)
---
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4    |   8 +
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |   6 +-
 .../db/integration/sync/IoTDBSyncSenderIT.java     |  35 ++--
 .../iotdb/db/engine/modification/Deletion.java     |  43 ++---
 .../db/newsync/pipedata/DeletionPipeData.java      |  82 +++++++++
 .../apache/iotdb/db/newsync/pipedata/PipeData.java |  97 ++++++++++
 .../iotdb/db/newsync/pipedata/SchemaPipeData.java  |  83 +++++++++
 .../iotdb/db/newsync/pipedata/TsFilePipeData.java  | 128 +++++++++++++
 .../iotdb/db/newsync/sender/pipe/PipeSink.java     |   3 +-
 .../iotdb/db/newsync/sender/pipe/TsFilePipe.java   | 135 ++++++++------
 .../db/newsync/sender/pipe/TsFilePipeData.java     | 198 ---------------------
 .../newsync/sender/recovery/SenderLogAnalyzer.java |   3 +-
 .../db/newsync/sender/recovery/TsFilePipeLog.java  |  62 ++++---
 .../sender/recovery/TsFilePipeLogAnalyzer.java     |  32 ++--
 .../db/newsync/sender/service/SenderService.java   |  20 +--
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   2 +
 16 files changed, 578 insertions(+), 359 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
index f406677..9fad63a 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
@@ -362,6 +362,10 @@ PIPE
     : P I P E
     ;
 
+PIPES
+    : P I P E S
+    ;
+
 PIPESERVER
     : P I P E S E R V E R
     ;
@@ -370,6 +374,10 @@ PIPESINK
     : P I P E S I N K
     ;
 
+PIPESINKS
+    : P I P E S I N K S
+    ;
+
 PIPESINKTYPE
     : P I P E S I N K T Y P E
     ;
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 24dbc79..7e4cd62 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -666,7 +666,7 @@ unloadFile
 
 // pipesink statement
 createPipeSink
-    : CREATE PIPESINK pipeSinkType=ID (LR_BRACKET syncAttributeClauses RR_BRACKET)? AS pipeSinkName=ID
+    : CREATE PIPESINK pipeSinkName=ID AS pipeSinkType=ID (LR_BRACKET syncAttributeClauses RR_BRACKET)?
     ;
 
 showPipeSinkType
@@ -674,7 +674,7 @@ showPipeSinkType
     ;
 
 showPipeSink
-    : SHOW PIPESINK (pipeSinkName=ID)?
+    : SHOW ((PIPESINK (pipeSinkName=ID)?) | PIPESINKS)
     ;
 
 dropPipeSink
@@ -687,7 +687,7 @@ createPipe
     ;
 
 showPipe
-    : SHOW PIPE (pipeName=ID)?
+    : SHOW ((PIPE (pipeName=ID)?) | PIPES)
     ;
 
 stopPipe
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/PipeSink.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
similarity index 56%
copy from server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/PipeSink.java
copy to integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
index 4b58c78..9a36b9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/PipeSink.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
@@ -15,32 +15,23 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
-package org.apache.iotdb.db.newsync.sender.pipe;
-
-import org.apache.iotdb.db.exception.PipeSinkException;
-
-public interface PipeSink {
-  void setAttribute(String attr, String value) throws PipeSinkException;
+package org.apache.iotdb.db.integration.sync;
 
-  String getName();
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
 
-  Type getType();
+import org.junit.Before;
+import org.junit.experimental.categories.Category;
 
-  String showAllAttributes();
-
-  enum Type {
-    IoTDB;
-  }
+import java.io.IOException;
 
-  class PipeSinkFactory {
-    public static PipeSink createPipeSink(String type, String name) {
-      type = type.toLowerCase();
-      if (Type.IoTDB.name().toLowerCase().equals(type)) {
-        return new IoTDBPipeSink(name);
-      }
-      throw new UnsupportedOperationException("Not support for " + type + " pipeSink");
-    }
+@Category({LocalStandaloneTest.class})
+public class IoTDBSyncSenderIT {
+  @Before
+  public void setUp() throws StorageEngineException, IOException {
+    EnvironmentUtils.cleanEnv();
+    EnvironmentUtils.envSetUp();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
index 10924ab..edb8d4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
@@ -23,7 +23,9 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import java.nio.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.Objects;
 
 /** Deletion is a delete operation on a timeseries. */
@@ -75,33 +77,22 @@ public class Deletion extends Modification {
     this.endTime = timestamp;
   }
 
-  public void serializeWithoutFileOffset(ByteBuffer byteBuffer) {
-    byteBuffer.mark();
-    try {
-      byteBuffer.putLong(startTime);
-      byteBuffer.putLong(endTime);
-      ReadWriteIOUtils.write(getPathString(), byteBuffer);
-    } catch (Exception e) {
-      byteBuffer.reset();
-      throw e;
-    }
+  public long serializeWithoutFileOffset(DataOutputStream stream) throws IOException {
+    long serializeSize = 0;
+    stream.writeLong(startTime);
+    serializeSize += Long.BYTES;
+    stream.writeLong(endTime);
+    serializeSize += Long.BYTES;
+    serializeSize += ReadWriteIOUtils.write(getPathString(), stream);
+    return serializeSize;
   }
 
-  public static Deletion deserializeWithoutFileOffset(ByteBuffer byteBuffer)
-      throws IllegalPathException {
-    byteBuffer.mark();
-    Deletion deletion;
-    try {
-      long startTime = byteBuffer.getLong();
-      long endTime = byteBuffer.getLong();
-      deletion =
-          new Deletion(
-              new PartialPath(ReadWriteIOUtils.readString(byteBuffer)), 0, startTime, endTime);
-    } catch (Exception e) {
-      byteBuffer.reset();
-      throw e;
-    }
-    return deletion;
+  public static Deletion deserializeWithoutFileOffset(DataInputStream stream)
+      throws IOException, IllegalPathException {
+    long startTime = stream.readLong();
+    long endTime = stream.readLong();
+    return new Deletion(
+        new PartialPath(ReadWriteIOUtils.readString(stream)), 0, startTime, endTime);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/DeletionPipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/DeletionPipeData.java
new file mode 100644
index 0000000..99d5638
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/DeletionPipeData.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.newsync.pipedata;
+
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class DeletionPipeData extends PipeData {
+  private static final Logger logger = LoggerFactory.getLogger(DeletionPipeData.class);
+
+  private Deletion deletion;
+
+  public DeletionPipeData(long serialNumber) {
+    super(serialNumber);
+  }
+
+  public DeletionPipeData(Deletion deletion, long serialNumber) {
+    super(serialNumber);
+    this.deletion = deletion;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.DELETION;
+  }
+
+  @Override
+  public long serializeImpl(DataOutputStream stream) throws IOException {
+    return deletion.serializeWithoutFileOffset(stream);
+  }
+
+  @Override
+  public void deserializeImpl(DataInputStream stream) throws IOException, IllegalPathException {
+    this.deletion = Deletion.deserializeWithoutFileOffset(stream);
+  }
+
+  @Override
+  public void sendToTransport() {
+    ByteArrayOutputStream bytesStream = new ByteArrayOutputStream();
+    DataOutputStream stream = new DataOutputStream(bytesStream);
+    try {
+      serialize(stream);
+      // senderTransport(bytesStream.toArray, this);
+      System.out.println(this);
+    } catch (IOException e) {
+      logger.warn(
+          String.format(
+              "Serialize deletion pipeData %s error, can not send to transport, because %s.",
+              this, e));
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "DeletionData{" + "deletion=" + deletion + ", serialNumber=" + serialNumber + '}';
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java
new file mode 100644
index 0000000..0575d5c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.newsync.pipedata;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public abstract class PipeData {
+
+  protected final long serialNumber;
+
+  public PipeData(long serialNumber) {
+    this.serialNumber = serialNumber;
+  }
+
+  public long getSerialNumber() {
+    return serialNumber;
+  }
+
+  //    abstract public Loader.Type getLoaderType() {
+  //      if (tsFilePath != null) {
+  //        return Loader.Type.TsFile;
+  //      } else if (deletion != null) {
+  //        return Loader.Type.Deletion;
+  //      } else if (plan != null) {
+  //        return Loader.Type.PhysicalPlan;
+  //      }
+  //      logger.error("Wrong type for transport type.");
+  //      return null;
+  //    }
+
+  public abstract Type getType();
+
+  public long serialize(DataOutputStream stream) throws IOException {
+    long serializeSize = 0;
+    stream.writeLong(serialNumber);
+    serializeSize += Long.BYTES;
+    stream.writeByte((byte) getType().ordinal());
+    serializeSize += Byte.BYTES;
+    serializeSize += serializeImpl(stream);
+    return serializeSize;
+  }
+
+  public abstract long serializeImpl(DataOutputStream stream) throws IOException;
+
+  public static PipeData deserialize(DataInputStream stream)
+      throws IOException, IllegalPathException {
+    long serialNumber = stream.readLong();
+    Type type = Type.values()[stream.readByte()];
+
+    PipeData pipeData = null;
+    switch (type) {
+      case TSFILE:
+        pipeData = new TsFilePipeData(serialNumber);
+        break;
+      case DELETION:
+        pipeData = new DeletionPipeData(serialNumber);
+        break;
+      case PHYSICALPLAN:
+        pipeData = new SchemaPipeData(serialNumber);
+        break;
+    }
+    pipeData.deserializeImpl(stream);
+    return pipeData;
+  }
+
+  public abstract void deserializeImpl(DataInputStream stream)
+      throws IOException, IllegalPathException;
+
+  public abstract void sendToTransport();
+
+  public enum Type {
+    TSFILE,
+    DELETION,
+    PHYSICALPLAN
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/SchemaPipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/SchemaPipeData.java
new file mode 100644
index 0000000..28f068a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/SchemaPipeData.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.newsync.pipedata;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class SchemaPipeData extends PipeData {
+  private static final int SERIALIZE_BUFFER_SIZE = 1024;
+
+  private PhysicalPlan plan;
+
+  public SchemaPipeData(long serialNumber) {
+    super(serialNumber);
+  }
+
+  public SchemaPipeData(PhysicalPlan plan, long serialNumber) {
+    super(serialNumber);
+    this.plan = plan;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.PHYSICALPLAN;
+  }
+
+  @Override
+  public long serializeImpl(DataOutputStream stream) throws IOException {
+    byte[] bytes = getBytes();
+    stream.writeInt(bytes.length);
+    stream.write(bytes);
+    return Integer.BYTES + bytes.length;
+  }
+
+  private byte[] getBytes() {
+    ByteBuffer buffer = ByteBuffer.allocate(SERIALIZE_BUFFER_SIZE);
+    plan.serialize(buffer);
+    byte[] bytes = new byte[buffer.position()];
+    buffer.flip();
+    buffer.get(bytes);
+    return bytes;
+  }
+
+  @Override
+  public void deserializeImpl(DataInputStream stream) throws IOException, IllegalPathException {
+    byte[] bytes = new byte[stream.readInt()];
+    stream.read(bytes);
+    this.plan = PhysicalPlan.Factory.create(ByteBuffer.wrap(bytes));
+  }
+
+  @Override
+  public void sendToTransport() {
+    // senderTransport(getBytes(), this);
+    System.out.println(this);
+  }
+
+  @Override
+  public String toString() {
+    return "SchemaPipeData{" + "serialNumber=" + serialNumber + ", plan=" + plan + '}';
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java
new file mode 100644
index 0000000..503486a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.newsync.pipedata;
+
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TsFilePipeData extends PipeData {
+  private static final Logger logger = LoggerFactory.getLogger(TsFilePipeData.class);
+
+  private String tsFilePath;
+
+  public TsFilePipeData(long serialNumber) {
+    super(serialNumber);
+  }
+
+  public TsFilePipeData(String tsFilePath, long serialNumber) {
+    super(serialNumber);
+    this.tsFilePath = tsFilePath;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.TSFILE;
+  }
+
+  @Override
+  public long serializeImpl(DataOutputStream stream) throws IOException {
+    return ReadWriteIOUtils.write(tsFilePath, stream);
+  }
+
+  @Override
+  public void deserializeImpl(DataInputStream stream) throws IOException {
+    this.tsFilePath = ReadWriteIOUtils.readString(stream);
+  }
+
+  @Override
+  public void sendToTransport() {
+    if (waitForTsFileClose()) {
+      // senderTransprot(getFiles(), this);
+      System.out.println(this);
+    }
+  }
+
+  private boolean waitForTsFileClose() {
+    for (int i = 0; i < SenderConf.defaultWaitingForTsFileRetryNumber; i++) {
+      if (isTsFileClosed()) {
+        return true;
+      }
+      try {
+        Thread.sleep(SenderConf.defaultWaitingForTsFileCloseMilliseconds);
+      } catch (InterruptedException e) {
+        logger.warn(String.format("Be Interrupted when waiting for tsfile %s closed", tsFilePath));
+      }
+      logger.info(
+          String.format(
+              "Waiting for tsfile %s close, retry %d / %d.",
+              tsFilePath, (i + 1), SenderConf.defaultWaitingForTsFileRetryNumber));
+    }
+    return false;
+  }
+
+  private boolean isTsFileClosed() {
+    File tsFile = new File(tsFilePath).getAbsoluteFile();
+    File resource = new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+    return resource.exists();
+  }
+
+  public List<File> getTsFiles() throws FileNotFoundException {
+    File tsFile = new File(tsFilePath).getAbsoluteFile();
+    File resource = new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+    File mods = new File(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX);
+
+    List<File> files = new ArrayList<>();
+    if (!tsFile.exists()) {
+      throw new FileNotFoundException(String.format("Can not find %s.", tsFile.getAbsolutePath()));
+    }
+    files.add(tsFile);
+    if (resource.exists()) {
+      files.add(resource);
+    }
+    if (mods.exists()) {
+      files.add(mods);
+    }
+    return files;
+  }
+
+  @Override
+  public String toString() {
+    return "TsFilePipeData{"
+        + "serialNumber="
+        + serialNumber
+        + ", tsFilePath='"
+        + tsFilePath
+        + '\''
+        + '}';
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/PipeSink.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/PipeSink.java
index 4b58c78..50383c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/PipeSink.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/PipeSink.java
@@ -40,7 +40,8 @@ public interface PipeSink {
       if (Type.IoTDB.name().toLowerCase().equals(type)) {
         return new IoTDBPipeSink(name);
       }
-      throw new UnsupportedOperationException("Not support for " + type + " pipeSink");
+      throw new UnsupportedOperationException(
+          String.format("Do not support pipeSink type %s", type));
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipe.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipe.java
index d0929068..99e6d44 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipe.java
@@ -27,6 +27,10 @@ import org.apache.iotdb.db.engine.storagegroup.virtualSg.StorageGroupManager;
 import org.apache.iotdb.db.exception.PipeException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.newsync.pipedata.DeletionPipeData;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+import org.apache.iotdb.db.newsync.pipedata.SchemaPipeData;
+import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
 import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
 import org.apache.iotdb.db.newsync.sender.recovery.TsFilePipeLog;
 import org.apache.iotdb.db.newsync.sender.recovery.TsFilePipeLogAnalyzer;
@@ -55,22 +59,22 @@ public class TsFilePipe implements Pipe {
 
   private final long createTime;
   private final String name;
-  private final IoTDBPipeSink pipeSink;
+  private final PipeSink pipeSink;
   private final long dataStartTime;
   private final boolean syncDelOp;
 
-  private ExecutorService singleExecutorService;
-  private TsFilePipeLog pipeLog;
+  private final ExecutorService singleExecutorService;
+  private final TsFilePipeLog pipeLog;
   private final ReentrantLock collectRealTimeDataLock;
 
-  private BlockingDeque<TsFilePipeData> pipeData;
+  private BlockingDeque<PipeData> pipeDataDeque;
   private long maxSerialNumber;
 
   private PipeStatus status;
   private boolean isCollectingRealTimeData;
 
   public TsFilePipe(
-      long createTime, String name, IoTDBPipeSink pipeSink, long dataStartTime, boolean syncDelOp) {
+      long createTime, String name, PipeSink pipeSink, long dataStartTime, boolean syncDelOp) {
     this.createTime = createTime;
     this.name = name;
     this.pipeSink = pipeSink;
@@ -91,10 +95,10 @@ public class TsFilePipe implements Pipe {
   }
 
   private void recover() {
-    this.pipeData = new TsFilePipeLogAnalyzer(this).recover();
+    this.pipeDataDeque = new TsFilePipeLogAnalyzer(this).recover();
     this.maxSerialNumber = 0L;
-    if (pipeData.size() != 0) {
-      this.maxSerialNumber = Math.max(maxSerialNumber, pipeData.getLast().getSerialNumber());
+    if (pipeDataDeque.size() != 0) {
+      this.maxSerialNumber = Math.max(maxSerialNumber, pipeDataDeque.getLast().getSerialNumber());
     }
   }
 
@@ -120,6 +124,7 @@ public class TsFilePipe implements Pipe {
       }
 
       singleExecutorService.submit(this::transport);
+      //
       status = PipeStatus.RUNNING;
     } catch (IOException e) {
       logger.error(
@@ -138,13 +143,13 @@ public class TsFilePipe implements Pipe {
     // get all history data
     int historyMetadataSize = historyMetadata.size();
     int historyTsFilesSize = historyTsFiles.size();
-    List<TsFilePipeData> historyData = new ArrayList<>();
+    List<PipeData> historyData = new ArrayList<>();
     for (int i = 0; i < historyMetadataSize; i++) {
-      long serialNumber = 1 - historyTsFilesSize - i;
-      historyData.add(new TsFilePipeData(historyMetadata.get(i), serialNumber));
+      long serialNumber = 1 - historyTsFilesSize - historyMetadataSize + i;
+      historyData.add(new SchemaPipeData(historyMetadata.get(i), serialNumber));
     }
     for (int i = 0; i < historyTsFilesSize; i++) {
-      long serialNumber = 1 - i;
+      long serialNumber = 1 - historyTsFilesSize + i;
       try {
         File hardLink =
             pipeLog.addHistoryTsFile(historyTsFiles.get(i).left, historyTsFiles.get(i).right);
@@ -160,11 +165,11 @@ public class TsFilePipe implements Pipe {
     // add history data into blocking deque
     int historyDataSize = historyData.size();
     for (int i = 0; i < historyDataSize; i++) {
-      pipeData.addFirst(historyData.get(historyDataSize - 1 - i));
+      pipeDataDeque.addFirst(historyData.get(historyDataSize - 1 - i));
     }
     // record history data
     for (int i = 0; i < historyDataSize; i++) {
-      TsFilePipeData data = historyData.get(i);
+      PipeData data = historyData.get(i);
       try {
         pipeLog.addHistoryPipeData(data);
       } catch (IOException e) {
@@ -206,8 +211,8 @@ public class TsFilePipe implements Pipe {
     collectRealTimeDataLock.lock();
     try {
       maxSerialNumber += 1L;
-      TsFilePipeData metaData = new TsFilePipeData(plan, maxSerialNumber);
-      pipeData.offer(metaData); // ensure can be transport
+      PipeData metaData = new SchemaPipeData(plan, maxSerialNumber);
+      collectRealTimePipeData(metaData); // ensure can be transport
       pipeLog.addRealTimePipeData(metaData);
     } catch (IOException e) {
       logger.warn(
@@ -264,9 +269,9 @@ public class TsFilePipe implements Pipe {
                 deletion.getStartTime(),
                 deletion.getEndTime());
         maxSerialNumber += 1L;
-        TsFilePipeData deletionData = new TsFilePipeData(splitDeletion, maxSerialNumber);
+        PipeData deletionData = new DeletionPipeData(splitDeletion, maxSerialNumber);
         pipeLog.addRealTimePipeData(deletionData);
-        pipeData.offer(deletionData);
+        collectRealTimePipeData(deletionData);
       }
     } catch (MetadataException e) {
       logger.warn(String.format("Collect deletion %s error, because %s.", deletion, e));
@@ -284,10 +289,10 @@ public class TsFilePipe implements Pipe {
     collectRealTimeDataLock.lock();
     try {
       maxSerialNumber += 1L;
-      TsFilePipeData tsFileData =
+      PipeData tsFileData =
           new TsFilePipeData(pipeLog.addRealTimeTsFile(tsFile).getPath(), maxSerialNumber);
       pipeLog.addRealTimePipeData(tsFileData);
-      pipeData.offer(tsFileData);
+      collectRealTimePipeData(tsFileData);
     } catch (IOException e) {
       logger.warn(
           String.format(
@@ -308,6 +313,13 @@ public class TsFilePipe implements Pipe {
     }
   }
 
+  private void collectRealTimePipeData(PipeData data) {
+    pipeDataDeque.offer(data);
+    synchronized (pipeDataDeque) {
+      pipeDataDeque.notifyAll();
+    }
+  }
+
   /** transport data * */
   private void transport() {
     // handshake
@@ -318,14 +330,14 @@ public class TsFilePipe implements Pipe {
           break;
         }
 
-        TsFilePipeData data;
+        PipeData data;
         try {
-          synchronized (pipeData) {
-            if (pipeData.isEmpty()) {
-              pipeData.wait();
-              pipeData.notifyAll();
+          synchronized (pipeDataDeque) {
+            if (pipeDataDeque.isEmpty()) {
+              pipeDataDeque.wait();
+              pipeDataDeque.notifyAll();
             }
-            data = pipeData.poll();
+            data = pipeDataDeque.poll();
           }
         } catch (InterruptedException e) {
           logger.warn(String.format("TsFile pipe %s has been interrupted.", name));
@@ -335,38 +347,51 @@ public class TsFilePipe implements Pipe {
         if (data == null) {
           continue;
         }
-        if (data.isTsFile()) {
-          if (waitForTsFileClose(data)) {
-            // senderTransport(data.getTsFiles, data.getLoaderType());
-          }
-        } else {
-          // senderTransport(data.getBytes, data.getLoaderType());
-        }
-        // pipeLog.removePipeData(data.getSerialNumber);
+        //        data.sendToTransport();
+        //         pipeLog.removePipeData(data.getSerialNumber);
+        //        data.sendToTransport();
+        //        Thread.sleep(1000);
+        //        pipeLog.removePipeData(data.getSerialNumber());
       }
     } catch (Exception e) {
       logger.error(String.format("TsFile pipe %s stops transportng data, because %s.", name, e));
     }
   }
 
-  private boolean waitForTsFileClose(TsFilePipeData data) {
-    for (int i = 0; i < SenderConf.defaultWaitingForTsFileRetryNumber; i++) {
-      if (data.isTsFileClosed()) {
-        return true;
+  public List<PipeData> pull(long serialNumber) {
+    if (pipeDataDeque.isEmpty()) {
+      return null;
+    }
+    List<PipeData> pullPipeData = new ArrayList<>();
+    PipeData data = pipeDataDeque.poll();
+    while (data.getSerialNumber() <= serialNumber) {
+      pullPipeData.add(data);
+      if (pipeDataDeque.isEmpty()) {
+        break;
+      } else {
+        data = pipeDataDeque.poll();
       }
+    }
+
+    int pullPipeDataSize = pullPipeData.size();
+    for (int i = 0; i < pullPipeDataSize; i++) {
+      pipeDataDeque.addFirst(pullPipeData.get(pullPipeDataSize - i - 1));
+    }
+    return pullPipeData;
+  }
+
+  public void commit(long serialNumber) {
+    while (!pipeDataDeque.isEmpty() && pipeDataDeque.peek().getSerialNumber() <= serialNumber) {
+      PipeData data = pipeDataDeque.poll();
       try {
-        Thread.sleep(SenderConf.defaultWaitingForTsFileCloseMilliseconds);
-      } catch (InterruptedException e) {
+        pipeLog.removePipeData(pipeDataDeque.poll().getSerialNumber());
+      } catch (IOException e) {
         logger.warn(
             String.format(
-                "Be Interrupted when waiting for tsfile %s closed", data.getTsFilePath()));
+                "Commit pipe data %s error, serial number is %s, because %s",
+                data, data.getSerialNumber(), e));
       }
-      logger.info(
-          String.format(
-              "Waiting for tsfile %s close, retry %d / %d.",
-              data.getTsFilePath(), (i + 1), SenderConf.defaultWaitingForTsFileRetryNumber));
     }
-    return false;
   }
 
   @Override
@@ -375,13 +400,15 @@ public class TsFilePipe implements Pipe {
       throw new PipeException(
           String.format("Can not stop pipe %s, because the pipe is drop.", name));
     }
-    if (status == PipeStatus.STOP) {
-      return;
-    }
 
+    if (!isCollectingRealTimeData) {
+      registerMetadata();
+      registerTsFile();
+      isCollectingRealTimeData = true;
+    }
     status = PipeStatus.STOP;
-    synchronized (pipeData) {
-      pipeData.notifyAll();
+    synchronized (pipeDataDeque) {
+      pipeDataDeque.notifyAll();
     }
   }
 
@@ -392,8 +419,8 @@ public class TsFilePipe implements Pipe {
     }
 
     status = PipeStatus.DROP;
-    synchronized (pipeData) {
-      pipeData.notifyAll();
+    synchronized (pipeDataDeque) {
+      pipeDataDeque.notifyAll();
     }
     clear();
   }
@@ -418,7 +445,7 @@ public class TsFilePipe implements Pipe {
       logger.warn(String.format("Clear pipe %s %d error, because %s.", name, createTime, e));
     }
 
-    pipeData = null;
+    pipeDataDeque = null;
     isCollectingRealTimeData = false;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipeData.java
deleted file mode 100644
index 52e19a3..0000000
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipeData.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.newsync.sender.pipe;
-
-import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-public class TsFilePipeData {
-  private static final Logger logger = LoggerFactory.getLogger(TsFilePipeData.class);
-  private static final int SERIALIZE_BUFFER_SIZE = 256;
-
-  private final long serialNumber;
-  private String tsFilePath;
-  private Deletion deletion;
-  private PhysicalPlan plan;
-
-  public TsFilePipeData(String tsFilePath, long serialNumber) {
-    this.tsFilePath = tsFilePath;
-    this.serialNumber = serialNumber;
-  }
-
-  public TsFilePipeData(Deletion deletion, long serialNumber) {
-    this.deletion = deletion;
-    this.serialNumber = serialNumber;
-  }
-
-  public TsFilePipeData(PhysicalPlan plan, long serialNumber) {
-    this.plan = plan;
-    this.serialNumber = serialNumber;
-  }
-
-  public boolean isTsFile() {
-    return tsFilePath != null;
-  }
-
-  public boolean isTsFileClosed() {
-    File tsFile = new File(tsFilePath).getAbsoluteFile();
-    File resource = new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
-    return resource.exists();
-  }
-
-  public String getTsFilePath() {
-    return tsFilePath == null ? "null" : tsFilePath;
-  }
-
-  public long getSerialNumber() {
-    return serialNumber;
-  }
-
-  //    public Loader.Type getLoaderType() {
-  //      if (tsFilePath != null) {
-  //        return Loader.Type.TsFile;
-  //      } else if (deletion != null) {
-  //        return Loader.Type.Deletion;
-  //      } else if (plan != null) {
-  //        return Loader.Type.PhysicalPlan;
-  //      }
-  //      logger.error("Wrong type for transport type.");
-  //      return null;
-  //    }
-
-  public List<File> getTsFiles() throws FileNotFoundException {
-    File tsFile = new File(tsFilePath).getAbsoluteFile();
-    File resource = new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
-    File mods = new File(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX);
-
-    List<File> files = new ArrayList<>();
-    if (!tsFile.exists()) {
-      throw new FileNotFoundException(String.format("Can not find %s.", tsFile.getAbsolutePath()));
-    }
-    files.add(tsFile);
-    if (resource.exists()) {
-      files.add(resource);
-    }
-    if (mods.exists()) {
-      files.add(mods);
-    }
-    return files;
-  }
-
-  public byte[] getBytes() {
-    ByteBuffer buffer = ByteBuffer.allocate(SERIALIZE_BUFFER_SIZE);
-    if (deletion != null) {
-      deletion.serializeWithoutFileOffset(buffer);
-    } else if (plan != null) {
-      plan.serialize(buffer);
-    } else if (tsFilePath != null) {
-      ReadWriteIOUtils.write(tsFilePath, buffer);
-    } else {
-      logger.error(String.format("Null tsfile pipe data with serialNumber %d.", serialNumber));
-    }
-
-    byte[] bytes = new byte[buffer.position()];
-    buffer.flip();
-    buffer.get(bytes);
-    return bytes;
-  }
-
-  public long serialize(DataOutputStream stream) throws IOException {
-    long serializeSize = 0;
-
-    stream.writeLong(serialNumber);
-    serializeSize += Long.BYTES;
-    if (tsFilePath != null) {
-      stream.writeByte((byte) Type.TSFILE.ordinal());
-    } else if (deletion != null) {
-      stream.writeByte((byte) Type.DELETION.ordinal());
-    } else if (plan != null) {
-      stream.writeByte((byte) Type.PHYSICALPLAN.ordinal());
-    } else {
-      logger.error(String.format("Null tsfile pipe data with serialNumber %d.", serialNumber));
-    }
-    serializeSize += Byte.BYTES;
-
-    byte[] bytes = getBytes();
-    stream.writeInt(bytes.length);
-    stream.write(bytes);
-    serializeSize += Integer.BYTES;
-    serializeSize += bytes.length;
-    return serializeSize;
-  }
-
-  public static TsFilePipeData deserialize(DataInputStream stream)
-      throws IOException, IllegalPathException {
-    long serialNumber = stream.readLong();
-    Type type = Type.values()[stream.readByte()];
-    byte[] bytes = new byte[stream.readInt()];
-    stream.read(bytes);
-
-    TsFilePipeData pipeData = null;
-    if (type.equals(Type.TSFILE)) {
-      pipeData = new TsFilePipeData(ReadWriteIOUtils.readString(stream), serialNumber);
-    } else if (type.equals(Type.DELETION)) {
-      pipeData =
-          new TsFilePipeData(
-              Deletion.deserializeWithoutFileOffset(ByteBuffer.wrap(bytes)), serialNumber);
-    } else if (type.equals(Type.PHYSICALPLAN)) {
-      pipeData =
-          new TsFilePipeData(PhysicalPlan.Factory.create(ByteBuffer.wrap(bytes)), serialNumber);
-    }
-
-    return pipeData;
-  }
-
-  public enum Type {
-    TSFILE,
-    DELETION,
-    PHYSICALPLAN
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder("TsFilePipeData{");
-    builder.append("serialNumber=" + serialNumber);
-    if (tsFilePath != null) {
-      builder.append(", tsfile=" + tsFilePath);
-    } else if (deletion != null) {
-      builder.append(", deletion=" + deletion);
-    } else if (plan != null) {
-      builder.append(", physicalplan=" + plan);
-    }
-    builder.append("}");
-    return builder.toString();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogAnalyzer.java
index 9e8582c..8f2364d 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogAnalyzer.java
@@ -113,8 +113,7 @@ public class SenderLogAnalyzer {
       }
     } catch (Exception e) {
       throw new IOException(
-          String.format(
-              "Recover error in line %d : %s, because %s", lineNumber, readLine, e.getMessage()));
+          String.format("Recover error in line %d : %s, because %s", lineNumber, readLine, e));
     }
 
     if (pipes.size() > 0) {
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLog.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLog.java
index 60fc40d..68a09d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLog.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLog.java
@@ -22,9 +22,10 @@ package org.apache.iotdb.db.newsync.sender.recovery;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
 import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
 import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipeData;
 import org.apache.iotdb.db.utils.FileUtils;
 
 import org.slf4j.Logger;
@@ -143,7 +144,7 @@ public class TsFilePipeLog {
   }
 
   /** add pipe log data */
-  public void addHistoryPipeData(TsFilePipeData pipeData) throws IOException {
+  public void addHistoryPipeData(PipeData pipeData) throws IOException {
     getHistoryOutputStream();
     pipeData.serialize(historyOutputStream);
   }
@@ -158,10 +159,10 @@ public class TsFilePipeLog {
     logDir.mkdirs();
     File historyPipeLog = new File(pipeLogDir, SenderConf.historyPipeLogName);
     createFile(historyPipeLog);
-    historyOutputStream = new DataOutputStream(new FileOutputStream(historyPipeLog));
+    historyOutputStream = new DataOutputStream(new FileOutputStream(historyPipeLog, true));
   }
 
-  public synchronized void addRealTimePipeData(TsFilePipeData pipeData) throws IOException {
+  public synchronized void addRealTimePipeData(PipeData pipeData) throws IOException {
     getRealTimeOutputStream(pipeData.getSerialNumber());
     currentPipeLogSize += pipeData.serialize(realTimeOutputStream);
   }
@@ -169,25 +170,15 @@ public class TsFilePipeLog {
   private void getRealTimeOutputStream(long serialNumber) throws IOException {
     if (realTimeOutputStream == null) {
       // recover real time pipe log
-      realTimePipeLogStartNumber = new LinkedBlockingDeque<>();
-      File logDir = new File(pipeLogDir);
-      List<Long> startNumbers = new ArrayList<>();
-
-      logDir.mkdirs();
-      for (File file : logDir.listFiles())
-        if (file.getName().endsWith(SenderConf.realTimePipeLogNameSuffix)) {
-          startNumbers.add(SenderConf.getSerialNumberFromPipeLogName(file.getName()));
-        }
-      if (startNumbers.size() != 0) {
-        Collections.sort(startNumbers);
-        for (Long startTime : startNumbers) {
-          realTimePipeLogStartNumber.offer(startTime);
-        }
+      if (realTimePipeLogStartNumber == null) {
+        recoverRealTimePipeLogStartNumber();
+      }
+      if (!realTimePipeLogStartNumber.isEmpty()) {
         File writingPipeLog =
             new File(
                 pipeLogDir,
-                SenderConf.getRealTimePipeLogName(startNumbers.get(startNumbers.size() - 1)));
-        realTimeOutputStream = new DataOutputStream(new FileOutputStream(writingPipeLog));
+                SenderConf.getRealTimePipeLogName(realTimePipeLogStartNumber.peekLast()));
+        realTimeOutputStream = new DataOutputStream(new FileOutputStream(writingPipeLog, true));
         currentPipeLogSize = writingPipeLog.length();
       } else {
         moveToNextPipeLog(serialNumber);
@@ -199,6 +190,24 @@ public class TsFilePipeLog {
     }
   }
 
+  private void recoverRealTimePipeLogStartNumber() {
+    realTimePipeLogStartNumber = new LinkedBlockingDeque<>();
+    File logDir = new File(pipeLogDir);
+    List<Long> startNumbers = new ArrayList<>();
+
+    logDir.mkdirs();
+    for (File file : logDir.listFiles())
+      if (file.getName().endsWith(SenderConf.realTimePipeLogNameSuffix)) {
+        startNumbers.add(SenderConf.getSerialNumberFromPipeLogName(file.getName()));
+      }
+    if (startNumbers.size() != 0) {
+      Collections.sort(startNumbers);
+      for (Long startTime : startNumbers) {
+        realTimePipeLogStartNumber.offer(startTime);
+      }
+    }
+  }
+
   private void moveToNextPipeLog(long startSerialNumber) throws IOException {
     if (realTimeOutputStream != null) {
       realTimeOutputStream.close();
@@ -219,7 +228,10 @@ public class TsFilePipeLog {
       if (historyOutputStream != null) {
         removeHistoryPipeLog();
       }
-      if (realTimePipeLogStartNumber.size() < 2) {
+      if (realTimePipeLogStartNumber == null) {
+        recoverRealTimePipeLogStartNumber();
+      }
+      if (realTimePipeLogStartNumber.size() >= 2) {
         long pipeLogStartNumber;
         while (true) {
           pipeLogStartNumber = realTimePipeLogStartNumber.poll();
@@ -260,11 +272,11 @@ public class TsFilePipeLog {
 
   private void removeTsFile(File realTimePipeLog) {
     try {
-      List<TsFilePipeData> pipeData = TsFilePipeLogAnalyzer.parseFile(realTimePipeLog);
+      List<PipeData> pipeData = TsFilePipeLogAnalyzer.parseFile(realTimePipeLog);
       List<File> tsFiles;
-      for (TsFilePipeData data : pipeData)
-        if (data.isTsFile()) {
-          tsFiles = data.getTsFiles();
+      for (PipeData data : pipeData)
+        if (PipeData.Type.TSFILE.equals(data.getType())) {
+          tsFiles = ((TsFilePipeData) data).getTsFiles();
           for (File file : tsFiles) {
             Files.deleteIfExists(file.toPath());
           }
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
index cd83d49..d24598f 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
@@ -20,9 +20,9 @@
 package org.apache.iotdb.db.newsync.sender.recovery;
 
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
 import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
 import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipeData;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +47,7 @@ public class TsFilePipeLogAnalyzer {
   private final String pipeDir;
   private final String pipeLogDir;
 
-  private BlockingDeque<TsFilePipeData> pipeData;
+  private BlockingDeque<PipeData> pipeDataDeque;
   private long removeSerialNumber;
 
   public TsFilePipeLogAnalyzer(TsFilePipe pipe) {
@@ -55,19 +55,19 @@ public class TsFilePipeLogAnalyzer {
     pipeLogDir = new File(pipeDir, SenderConf.pipeLogDirName).getPath();
   }
 
-  public BlockingDeque<TsFilePipeData> recover() {
-    pipeData = new LinkedBlockingDeque<>();
+  public BlockingDeque<PipeData> recover() {
+    pipeDataDeque = new LinkedBlockingDeque<>();
     removeSerialNumber = Long.MIN_VALUE;
 
     if (!new File(pipeDir).exists()) {
-      return pipeData;
+      return pipeDataDeque;
     }
 
     deserializeRemoveSerialNumber();
     recoverHistoryData();
     recoverRealTimeData();
 
-    return pipeData;
+    return pipeDataDeque;
   }
 
   private void deserializeRemoveSerialNumber() {
@@ -105,15 +105,15 @@ public class TsFilePipeLogAnalyzer {
 
     if (removeSerialNumber < 0) {
       try {
-        List<TsFilePipeData> historyPipeData = parseFile(historyPipeLog);
-        for (TsFilePipeData data : historyPipeData)
+        List<PipeData> historyPipeData = parseFile(historyPipeLog);
+        for (PipeData data : historyPipeData)
           if (data.getSerialNumber() > removeSerialNumber) {
-            pipeData.offer(data);
+            pipeDataDeque.offer(data);
           }
       } catch (IOException e) {
         logger.error(
             String.format(
-                "Can not parse history pipe log %s, because %s", historyPipeLog.getPath()));
+                "Can not parse history pipe log %s, because %s", historyPipeLog.getPath(), e));
       }
     } else {
       try {
@@ -143,10 +143,10 @@ public class TsFilePipeLogAnalyzer {
         File realTimePipeLog =
             new File(this.pipeLogDir, SenderConf.getRealTimePipeLogName(startNumber));
         try {
-          List<TsFilePipeData> realTimeData = parseFile(realTimePipeLog);
-          for (TsFilePipeData data : realTimeData)
+          List<PipeData> realTimeData = parseFile(realTimePipeLog);
+          for (PipeData data : realTimeData)
             if (data.getSerialNumber() > removeSerialNumber) {
-              pipeData.offer(data);
+              pipeDataDeque.offer(data);
             }
         } catch (IOException e) {
           logger.error(
@@ -161,12 +161,12 @@ public class TsFilePipeLogAnalyzer {
     return new File(pipeDir, SenderConf.pipeCollectFinishLockName).exists();
   }
 
-  public static List<TsFilePipeData> parseFile(File file) throws IOException {
-    List<TsFilePipeData> pipeData = new ArrayList<>();
+  public static List<PipeData> parseFile(File file) throws IOException {
+    List<PipeData> pipeData = new ArrayList<>();
     DataInputStream inputStream = new DataInputStream(new FileInputStream(file));
     try {
       while (true) {
-        pipeData.add(TsFilePipeData.deserialize(inputStream));
+        pipeData.add(PipeData.deserialize(inputStream));
       }
     } catch (EOFException e) {
       logger.info(String.format("Finish parsing pipeLog %s.", file.getPath()));
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java
index 498c714..8e1ad25 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.exception.PipeException;
 import org.apache.iotdb.db.exception.PipeSinkException;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
-import org.apache.iotdb.db.newsync.sender.pipe.IoTDBPipeSink;
 import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
 import org.apache.iotdb.db.newsync.sender.pipe.PipeSink;
 import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipe;
@@ -177,18 +176,15 @@ public class SenderService implements IService {
     }
 
     // get TsFilePipe
-    PipeSink.Type pipeSinkType = pipeSink.getType();
-    if (!pipeSinkType.equals(PipeSink.Type.IoTDB)) {
-      throw new PipeException(
-          String.format(
-              "Wrong pipeSink type %s for create TsFilePipe.", pipeSinkType)); // internal error
-    }
+    //    PipeSink.Type pipeSinkType = pipeSink.getType();
+    //    if (!pipeSinkType.equals(PipeSink.Type.IoTDB)) {
+    //      throw new PipeException(
+    //          String.format(
+    //              "Wrong pipeSink type %s for create TsFilePipe.", pipeSinkType)); // internal
+    // error
+    //    }
     return new TsFilePipe(
-        pipeCreateTime,
-        plan.getPipeName(),
-        (IoTDBPipeSink) pipeSink,
-        plan.getDataStartTimestamp(),
-        syncDelOp);
+        pipeCreateTime, plan.getPipeName(), pipeSink, plan.getDataStartTimestamp(), syncDelOp);
   }
 
   public void stopPipe(String pipeName) throws PipeException {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 2a82e6e..4aa910e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -193,7 +193,9 @@ public abstract class Operator {
     PRUNE_TEMPLATE,
     APPEND_TEMPLATE,
     DROP_TEMPLATE,
+
     SHOW_QUERY_RESOURCE,
+
     CREATE_PIPESINK,
     DROP_PIPESINK,
     SHOW_PIPESINK,