You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/01/11 13:24:02 UTC

[iotdb] branch WalDirectBuffer updated: use netty buffer

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch WalDirectBuffer
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/WalDirectBuffer by this push:
     new 704b352  use netty buffer
704b352 is described below

commit 704b35270d172e0b27c93a8e3cd6fd5e7cda4bfa
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Mon Jan 11 21:23:29 2021 +0800

    use netty buffer
---
 server/pom.xml                                     |   5 +
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |  23 +++-
 .../iotdb/db/qp/physical/crud/DeletePlan.java      |  15 +++
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |  70 ++++++++++++
 .../physical/crud/InsertRowsOfOneDevicePlan.java   |  15 +++
 .../db/qp/physical/crud/InsertTabletPlan.java      | 123 +++++++++++++++++++++
 .../iotdb/db/qp/physical/sys/AuthorPlan.java       |  28 +++++
 .../iotdb/db/qp/physical/sys/ChangeAliasPlan.java  |  18 ++-
 .../db/qp/physical/sys/ChangeTagOffsetPlan.java    |  18 ++-
 .../iotdb/db/qp/physical/sys/CreateIndexPlan.java  |  23 ++++
 .../qp/physical/sys/CreateMultiTimeSeriesPlan.java |  58 ++++++++++
 .../db/qp/physical/sys/CreateTimeSeriesPlan.java   |  47 ++++++++
 .../iotdb/db/qp/physical/sys/DataAuthPlan.java     |  14 +++
 .../db/qp/physical/sys/DeleteStorageGroupPlan.java |  13 +++
 .../db/qp/physical/sys/DeleteTimeSeriesPlan.java   |  13 +++
 .../iotdb/db/qp/physical/sys/DropIndexPlan.java    |  15 +++
 .../apache/iotdb/db/qp/physical/sys/FlushPlan.java |  40 +++++++
 .../apache/iotdb/db/qp/physical/sys/MNodePlan.java |  16 ++-
 .../db/qp/physical/sys/MeasurementMNodePlan.java   |  20 +++-
 .../db/qp/physical/sys/SetStorageGroupPlan.java    |   8 ++
 .../iotdb/db/qp/physical/sys/SetTTLPlan.java       |  11 ++
 .../db/qp/physical/sys/StorageGroupMNodePlan.java  |  16 ++-
 .../apache/iotdb/db/writelog/io/ILogWriter.java    |   3 +
 .../org/apache/iotdb/db/writelog/io/LogWriter.java |  63 +++++++----
 .../db/writelog/node/ExclusiveWriteLogNode.java    |  46 ++++----
 .../apache/iotdb/db/writelog/WriteLogNodeTest.java |   2 +-
 tsfile/pom.xml                                     |   5 +
 .../tsfile/file/metadata/enums/TSDataType.java     |   5 +
 .../iotdb/tsfile/utils/ReadWriteIOUtils.java       |  92 +++++++++++++++
 .../tsfile/write/schema/MeasurementSchema.java     |  25 +++++
 30 files changed, 782 insertions(+), 68 deletions(-)

diff --git a/server/pom.xml b/server/pom.xml
index ff38b75..81f058d 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -191,6 +191,11 @@
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-buffer</artifactId>
+            <version>4.1.22.Final</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index e9bf655..a2430fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -30,14 +31,14 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
 import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateIndexPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
@@ -139,6 +140,10 @@ public abstract class PhysicalPlan {
     throw new UnsupportedOperationException(SERIALIZATION_UNIMPLEMENTED);
   }
 
+  public void serialize(ByteBuf buffer) {
+    throw new UnsupportedOperationException(SERIALIZATION_UNIMPLEMENTED);
+  }
+
   /**
    * Deserialize the plan from the given buffer. This is provided for WAL, and must be used with
    * serializeToWAL.
@@ -157,12 +162,26 @@ public abstract class PhysicalPlan {
     }
   }
 
+  protected void putString(ByteBuf buffer, String value) {
+    if (value == null) {
+      buffer.writeInt(NULL_VALUE_LEN);
+    } else {
+      ReadWriteIOUtils.write(value, buffer);
+    }
+  }
+
   protected void putStrings(ByteBuffer buffer, List<String> values) {
     for (String value : values) {
       putString(buffer, value);
     }
   }
 
+  protected void putStrings(ByteBuf buffer, List<String> values) {
+    for (String value : values) {
+      putString(buffer, value);
+    }
+  }
+
   protected void putString(DataOutputStream stream, String value) throws IOException {
     if (value == null) {
       stream.writeInt(NULL_VALUE_LEN);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
index 9b1fe6a..982325a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -148,6 +149,20 @@ public class DeletePlan extends PhysicalPlan {
   }
 
   @Override
+  public void serialize(ByteBuf buffer) {
+    int type = PhysicalPlanType.DELETE.ordinal();
+    buffer.writeByte((byte) type);
+    buffer.writeLong(deleteStartTime);
+    buffer.writeLong(deleteEndTime);
+    buffer.writeInt(paths.size());
+    for (PartialPath path : paths) {
+      putString(buffer, path.getFullPath());
+    }
+
+    buffer.writeLong(index);
+  }
+
+  @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
     this.deleteStartTime = buffer.getLong();
     this.deleteEndTime = buffer.getLong();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index ec32f5f..5b06d01 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -369,6 +370,44 @@ public class InsertRowPlan extends InsertPlan {
     }
   }
 
+  private void putValues(ByteBuf buffer) throws QueryProcessException {
+    for (int i = 0; i < values.length; i++) {
+      if (measurements[i] == null) {
+        continue;
+      }
+      // types are not determined, the situation mainly occurs when the plan uses string values
+      // and is forwarded to other nodes
+      if (dataTypes == null || dataTypes[i] == null) {
+        ReadWriteIOUtils.write(TYPE_RAW_STRING, buffer);
+        ReadWriteIOUtils.write((String) values[i], buffer);
+      } else {
+        ReadWriteIOUtils.write(dataTypes[i], buffer);
+        switch (dataTypes[i]) {
+          case BOOLEAN:
+            ReadWriteIOUtils.write((Boolean) values[i], buffer);
+            break;
+          case INT32:
+            ReadWriteIOUtils.write((Integer) values[i], buffer);
+            break;
+          case INT64:
+            ReadWriteIOUtils.write((Long) values[i], buffer);
+            break;
+          case FLOAT:
+            ReadWriteIOUtils.write((Float) values[i], buffer);
+            break;
+          case DOUBLE:
+            ReadWriteIOUtils.write((Double) values[i], buffer);
+            break;
+          case TEXT:
+            ReadWriteIOUtils.write((Binary) values[i], buffer);
+            break;
+          default:
+            throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+        }
+      }
+    }
+  }
+
   /**
    * Make sure the values is already inited before calling this
    */
@@ -418,6 +457,16 @@ public class InsertRowPlan extends InsertPlan {
     serializeMeasurementsAndValues(buffer);
   }
 
+  @Override
+  public void serialize(ByteBuf buffer) {
+    int type = PhysicalPlanType.INSERT.ordinal();
+    buffer.writeByte((byte) type);
+    buffer.writeLong(time);
+
+    putString(buffer, deviceId.getFullPath());
+    serializeMeasurementsAndValues(buffer);
+  }
+
   void serializeMeasurementsAndValues(ByteBuffer buffer) {
     buffer
         .putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
@@ -439,6 +488,27 @@ public class InsertRowPlan extends InsertPlan {
     buffer.putLong(index);
   }
 
+  void serializeMeasurementsAndValues(ByteBuf buffer) {
+    buffer
+        .writeInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
+
+    for (String measurement : measurements) {
+      if (measurement != null) {
+        putString(buffer, measurement);
+      }
+    }
+
+    try {
+      putValues(buffer);
+    } catch (QueryProcessException e) {
+      logger.error("Failed to serialize values for {}", this, e);
+    }
+
+    // the types are not inferred before the plan is serialized
+    buffer.writeByte((byte) (isNeedInferType ? 1 : 0));
+    buffer.writeLong(index);
+  }
+
   @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
     this.time = buffer.getLong();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index 66bf538..d15736d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -113,6 +114,20 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan {
   }
 
   @Override
+  public void serialize(ByteBuf buffer) {
+    int type = PhysicalPlanType.INSERT.ordinal();
+    buffer.writeByte((byte) type);
+
+    putString(buffer, deviceId.getFullPath());
+
+    buffer.writeInt(rowPlans.length);
+    for (InsertRowPlan plan : rowPlans) {
+      buffer.writeLong(plan.getTime());
+      plan.serializeMeasurementsAndValues(buffer);
+    }
+  }
+
+  @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
 
     this.deviceId = new PartialPath(readString(buffer));
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index b3572b5..29b74f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -217,6 +218,18 @@ public class InsertTabletPlan extends InsertPlan {
     writeValues(buffer);
   }
 
+  @Override
+  public void serialize(ByteBuf buffer) {
+    int type = PhysicalPlanType.BATCHINSERT.ordinal();
+    buffer.writeByte((byte) type);
+
+    putString(buffer, deviceId.getFullPath());
+    writeMeasurements(buffer);
+    writeDataTypes(buffer);
+    writeTimes(buffer);
+    writeValues(buffer);
+  }
+
   private void writeMeasurements(ByteBuffer buffer) {
     buffer
         .putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
@@ -227,6 +240,16 @@ public class InsertTabletPlan extends InsertPlan {
     }
   }
 
+  private void writeMeasurements(ByteBuf buffer) {
+    buffer
+        .writeInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
+    for (String m : measurements) {
+      if (m != null) {
+        putString(buffer, m);
+      }
+    }
+  }
+
   private void writeDataTypes(ByteBuffer buffer) {
     for (int i = 0, dataTypesLength = dataTypes.length; i < dataTypesLength; i++) {
       TSDataType dataType = dataTypes[i];
@@ -236,6 +259,15 @@ public class InsertTabletPlan extends InsertPlan {
     }
   }
 
+  private void writeDataTypes(ByteBuf buffer) {
+    for (int i = 0, dataTypesLength = dataTypes.length; i < dataTypesLength; i++) {
+      TSDataType dataType = dataTypes[i];
+      if (measurements[i] != null) {
+        dataType.serializeTo(buffer);
+      }
+    }
+  }
+
   private void writeTimes(ByteBuffer buffer) {
     if (isExecuting) {
       buffer.putInt(end - start);
@@ -259,6 +291,29 @@ public class InsertTabletPlan extends InsertPlan {
     }
   }
 
+  private void writeTimes(ByteBuf buffer) {
+    if (isExecuting) {
+      buffer.writeInt(end - start);
+    } else {
+      buffer.writeInt(rowCount);
+    }
+
+    if (timeBuffer == null) {
+      if (isExecuting) {
+        for (int i = start; i < end; i++) {
+          buffer.writeLong(times[i]);
+        }
+      } else {
+        for (long time : times) {
+          buffer.writeLong(time);
+        }
+      }
+    } else {
+      buffer.writeBytes(timeBuffer.array());
+      timeBuffer = null;
+    }
+  }
+
   private void writeValues(ByteBuffer buffer) {
     if (valueBuffer == null) {
       serializeValues(buffer);
@@ -270,6 +325,17 @@ public class InsertTabletPlan extends InsertPlan {
     buffer.putLong(index);
   }
 
+  private void writeValues(ByteBuf buffer) {
+    if (valueBuffer == null) {
+      serializeValues(buffer);
+    } else {
+      buffer.writeBytes(valueBuffer.array());
+      valueBuffer = null;
+    }
+
+    buffer.writeLong(index);
+  }
+
   private void serializeValues(DataOutputStream outputStream) throws IOException {
     for (int i = 0; i < measurements.length; i++) {
       if (measurements[i] == null) {
@@ -288,6 +354,15 @@ public class InsertTabletPlan extends InsertPlan {
     }
   }
 
+  private void serializeValues(ByteBuf buffer) {
+    for (int i = 0; i < measurements.length; i++) {
+      if (measurements[i] == null) {
+        continue;
+      }
+      serializeColumn(dataTypes[i], columns[i], buffer, start, end);
+    }
+  }
+
   private void serializeColumn(TSDataType dataType, Object column, ByteBuffer buffer,
       int start, int end) {
     int curStart = isExecuting ? start : 0;
@@ -336,6 +411,54 @@ public class InsertTabletPlan extends InsertPlan {
     }
   }
 
+  private void serializeColumn(TSDataType dataType, Object column, ByteBuf buffer,
+      int start, int end) {
+    int curStart = isExecuting ? start : 0;
+    int curEnd = isExecuting ? end : rowCount;
+    switch (dataType) {
+      case INT32:
+        int[] intValues = (int[]) column;
+        for (int j = curStart; j < curEnd; j++) {
+          buffer.writeInt(intValues[j]);
+        }
+        break;
+      case INT64:
+        long[] longValues = (long[]) column;
+        for (int j = curStart; j < curEnd; j++) {
+          buffer.writeLong(longValues[j]);
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = (float[]) column;
+        for (int j = curStart; j < curEnd; j++) {
+          buffer.writeFloat(floatValues[j]);
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = (double[]) column;
+        for (int j = curStart; j < curEnd; j++) {
+          buffer.writeDouble(doubleValues[j]);
+        }
+        break;
+      case BOOLEAN:
+        boolean[] boolValues = (boolean[]) column;
+        for (int j = curStart; j < curEnd; j++) {
+          buffer.writeByte(BytesUtils.boolToByte(boolValues[j]));
+        }
+        break;
+      case TEXT:
+        Binary[] binaryValues = (Binary[]) column;
+        for (int j = curStart; j < curEnd; j++) {
+          buffer.writeInt(binaryValues[j].getLength());
+          buffer.writeBytes(binaryValues[j].getValues());
+        }
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format(DATATYPE_UNSUPPORTED, dataType));
+    }
+  }
+
   private void serializeColumn(TSDataType dataType, Object column, DataOutputStream outputStream,
       int start, int end) throws IOException {
     int curStart = isExecuting ? start : 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
index bbf1d78..38bca24 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.sys;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -333,6 +334,33 @@ public class AuthorPlan extends PhysicalPlan {
   }
 
   @Override
+  public void serialize(ByteBuf buffer) {
+    int type = this.getPlanType(super.getOperatorType());
+    buffer.writeByte((byte) type);
+    buffer.writeInt(authorType.ordinal());
+    putString(buffer, userName);
+    putString(buffer, roleName);
+    putString(buffer, password);
+    putString(buffer, newPassword);
+    if (permissions == null) {
+      buffer.writeByte((byte) 0);
+    } else {
+      buffer.writeInt((byte) 1);
+      buffer.writeInt(permissions.size());
+      for (int permission : permissions) {
+        buffer.writeInt(permission);
+      }
+    }
+    if (nodeName == null) {
+      putString(buffer, null);
+    } else {
+      putString(buffer, nodeName.getFullPath());
+    }
+
+    buffer.writeLong(index);
+  }
+
+  @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
     this.authorType = AuthorType.values()[buffer.getInt()];
     this.userName = readString(buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeAliasPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeAliasPlan.java
index a6bf1aa..6d84982 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeAliasPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeAliasPlan.java
@@ -19,17 +19,17 @@
 
 package org.apache.iotdb.db.qp.physical.sys;
 
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
 public class ChangeAliasPlan extends PhysicalPlan {
   private PartialPath path;
@@ -79,6 +79,14 @@ public class ChangeAliasPlan extends PhysicalPlan {
   }
 
   @Override
+  public void serialize(ByteBuf buffer) {
+    int type = PhysicalPlanType.CHANGE_ALIAS.ordinal();
+    buffer.writeByte((byte) type);
+    putString(buffer, path.getFullPath());
+    putString(buffer, alias);
+  }
+
+  @Override
   public void serialize(DataOutputStream stream) throws IOException {
     stream.write((byte) PhysicalPlanType.CHANGE_ALIAS.ordinal());
     putString(stream, path.getFullPath());
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeTagOffsetPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeTagOffsetPlan.java
index ba80502..2c4361f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeTagOffsetPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeTagOffsetPlan.java
@@ -19,17 +19,17 @@
 
 package org.apache.iotdb.db.qp.physical.sys;
 
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
 public class ChangeTagOffsetPlan extends PhysicalPlan {
   private PartialPath path;
@@ -79,6 +79,14 @@ public class ChangeTagOffsetPlan extends PhysicalPlan {
   }
 
   @Override
+  public void serialize(ByteBuf buffer) {
+    int type = PhysicalPlanType.CHANGE_TAG_OFFSET.ordinal();
+    buffer.writeByte((byte) type);
+    putString(buffer, path.getFullPath());
+    buffer.writeLong(offset);
+  }
+
+  @Override
   public void serialize(DataOutputStream stream) throws IOException {
     stream.write((byte) PhysicalPlanType.CHANGE_TAG_OFFSET.ordinal());
     putString(stream, path.getFullPath());
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateIndexPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateIndexPlan.java
index f614e42..691818d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateIndexPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateIndexPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.sys;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -134,6 +135,28 @@ public class CreateIndexPlan extends PhysicalPlan {
   }
 
   @Override
+  public void serialize(ByteBuf buffer) {
+    int type = PhysicalPlanType.CREATE_INDEX.ordinal();
+    buffer.writeByte((byte) type);
+    buffer.writeByte((byte) indexType.serialize());
+    buffer.writeLong(time);
+    buffer.writeInt(paths.size());
+    for (PartialPath path : paths) {
+      putString(buffer, path.getFullPath());
+    }
+
+    // props
+    if (props != null && !props.isEmpty()) {
+      buffer.writeByte((byte) 1);
+      ReadWriteIOUtils.write(props, buffer);
+    } else {
+      buffer.writeByte((byte) 0);
+    }
+
+    buffer.writeLong(index);
+  }
+
+  @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
     indexType = IndexType.deserialize(buffer.get());
     time = buffer.getLong();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
index e381998..ba06ff4 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.sys;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -227,6 +228,33 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
     buffer.putLong(index);
   }
 
+  @Override
+  public void serialize(ByteBuf buffer) {
+    int type = PhysicalPlanType.CREATE_MULTI_TIMESERIES.ordinal();
+    buffer.writeByte((byte) type);
+    buffer.writeInt(paths.size());
+
+    for (PartialPath path : paths) {
+      putString(buffer, path.getFullPath());
+    }
+
+    for (TSDataType dataType : dataTypes) {
+      buffer.writeByte((byte) dataType.ordinal());
+    }
+
+    for (TSEncoding encoding : encodings) {
+      buffer.writeByte((byte) encoding.ordinal());
+    }
+
+    for (CompressionType compressor : compressors) {
+      buffer.writeByte((byte) compressor.ordinal());
+    }
+
+    serializeOptional(buffer);
+
+    buffer.writeLong(index);
+  }
+
   private void serializeOptional(ByteBuffer buffer) {
     if (alias != null) {
       buffer.put((byte) 1);
@@ -257,6 +285,36 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
     }
   }
 
+  private void serializeOptional(ByteBuf buffer) {
+    if (alias != null) {
+      buffer.writeByte((byte) 1);
+      putStrings(buffer, alias);
+    } else {
+      buffer.writeByte((byte) 0);
+    }
+
+    if (props != null) {
+      buffer.writeByte((byte) 1);
+      ReadWriteIOUtils.write(props, buffer);
+    } else {
+      buffer.writeByte((byte) 0);
+    }
+
+    if (tags != null) {
+      buffer.writeByte((byte) 1);
+      ReadWriteIOUtils.write(tags, buffer);
+    } else {
+      buffer.writeByte((byte) 0);
+    }
+
+    if (attributes != null) {
+      buffer.writeByte((byte) 1);
+      ReadWriteIOUtils.write(attributes, buffer);
+    } else {
+      buffer.writeByte((byte) 0);
+    }
+  }
+
   @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
     int totalSize = buffer.getInt();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateTimeSeriesPlan.java
index 7716af0..6fa38fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateTimeSeriesPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.sys;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -246,6 +247,52 @@ public class CreateTimeSeriesPlan extends PhysicalPlan {
   }
 
   @Override
+  public void serialize(ByteBuf buffer) {
+    buffer.writeByte((byte) PhysicalPlanType.CREATE_TIMESERIES.ordinal());
+    byte[] bytes = path.getFullPath().getBytes();
+    buffer.writeInt(bytes.length);
+    buffer.writeBytes(bytes);
+    buffer.writeByte((byte) dataType.ordinal());
+    buffer.writeByte((byte) encoding.ordinal());
+    buffer.writeByte((byte) compressor.ordinal());
+    buffer.writeLong(tagOffset);
+
+    // alias
+    if (alias != null) {
+      buffer.writeByte((byte) 1);
+      ReadWriteIOUtils.write(alias, buffer);
+    } else {
+      buffer.writeByte((byte) 0);
+    }
+
+    // props
+    if (props != null && !props.isEmpty()) {
+      buffer.writeByte((byte) 1);
+      ReadWriteIOUtils.write(props, buffer);
+    } else {
+      buffer.writeByte((byte) 0);
+    }
+
+    // tags
+    if (tags != null && !tags.isEmpty()) {
+      buffer.writeByte((byte) 1);
+      ReadWriteIOUtils.write(tags, buffer);
+    } else {
+      buffer.writeByte((byte) 0);
+    }
+
+    // attributes
+    if (attributes != null && !attributes.isEmpty()) {
+      buffer.writeByte((byte) 1);
+      ReadWriteIOUtils.write(attributes, buffer);
+    } else {
+      buffer.writeByte((byte) 0);
+    }
+
+    buffer.writeLong(index);
+  }
+
+  @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
     int length = buffer.getInt();
     byte[] bytes = new byte[length];
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DataAuthPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DataAuthPlan.java
index b31e795..bd746ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DataAuthPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DataAuthPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.sys;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -77,6 +78,19 @@ public class DataAuthPlan extends PhysicalPlan {
   }
 
   @Override
+  public void serialize(ByteBuf buffer) {
+    int type = this.getPlanType(super.getOperatorType());
+    buffer.writeByte((byte) type);
+    buffer.writeInt(users.size());
+
+    for (String user : users) {
+      putString(buffer, user);
+    }
+
+    buffer.writeLong(index);
+  }
+
+  @Override
   public void deserialize(ByteBuffer buffer) {
     int userSize = buffer.getInt();
     this.users = new ArrayList<>(userSize);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteStorageGroupPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteStorageGroupPlan.java
index 5f8a13f..6e9cb9d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteStorageGroupPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteStorageGroupPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.sys;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -71,6 +72,18 @@ public class DeleteStorageGroupPlan extends PhysicalPlan {
   }
 
   @Override
+  public void serialize(ByteBuf buffer) {
+    int type = PhysicalPlanType.DELETE_STORAGE_GROUP.ordinal();
+    buffer.writeByte((byte) type);
+    buffer.writeInt(this.getPaths().size());
+    for (PartialPath path : this.getPaths()) {
+      putString(buffer, path.getFullPath());
+    }
+
+    buffer.writeLong(index);
+  }
+
+  @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
     int pathNum = buffer.getInt();
     this.deletePathList = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
index 24373be..cd0c3a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.sys;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -75,6 +76,18 @@ public class DeleteTimeSeriesPlan extends PhysicalPlan {
   }
 
   @Override
+  public void serialize(ByteBuf buffer) {
+    int type = PhysicalPlanType.DELETE_TIMESERIES.ordinal();
+    buffer.writeByte((byte) type);
+    buffer.writeInt(deletePathList.size());
+    for (PartialPath path : deletePathList) {
+      putString(buffer, path.getFullPath());
+    }
+
+    buffer.writeLong(index);
+  }
+
+  @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
     int pathNumber = buffer.getInt();
     deletePathList = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropIndexPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropIndexPlan.java
index 41b4ad8..b69da72 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropIndexPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropIndexPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.sys;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -94,6 +95,20 @@ public class DropIndexPlan extends PhysicalPlan {
   }
 
   @Override
+  public void serialize(ByteBuf buffer) {
+    int type = PhysicalPlanType.DROP_INDEX.ordinal();
+    buffer.writeByte((byte) type);
+    buffer.writeByte((byte) indexType.serialize());
+
+    buffer.writeInt(paths.size());
+    for (PartialPath path : paths) {
+      putString(buffer, path.getFullPath());
+    }
+
+    buffer.writeLong(index);
+  }
+
+  @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
     indexType = IndexType.deserialize(buffer.get());
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/FlushPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/FlushPlan.java
index c6c0ef3..6557d12 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/FlushPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/FlushPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.sys;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -158,6 +159,19 @@ public class FlushPlan extends PhysicalPlan {
     writeStorageGroupPartitionIds(buffer);
   }
 
+  @Override
+  public void serialize(ByteBuf buffer) {
+    int type = PhysicalPlanType.FLUSH.ordinal();
+    buffer.writeByte((byte) type);
+    if (isSeq == null) {
+      buffer.writeByte((byte) 2);
+    } else {
+      buffer.writeByte((byte) (Boolean.TRUE.equals(isSeq) ? 1 : 0));
+    }
+    buffer.writeByte((byte) (isSync ? 1 : 0));
+    writeStorageGroupPartitionIds(buffer);
+  }
+
   public void writeStorageGroupPartitionIds(ByteBuffer buffer) {
     if (storageGroupPartitionIds == null) {
       // null value
@@ -184,6 +198,32 @@ public class FlushPlan extends PhysicalPlan {
     }
   }
 
+  public void writeStorageGroupPartitionIds(ByteBuf buffer) {
+    if (storageGroupPartitionIds == null) {
+      // null value
+      buffer.writeByte((byte) 0);
+    } else {
+      // null value
+      buffer.writeByte((byte) 1);
+      buffer.writeInt(storageGroupPartitionIds.size());
+      for (Entry<PartialPath, List<Pair<Long, Boolean>>> entry : storageGroupPartitionIds
+          .entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey().getFullPath(), buffer);
+        if (entry.getValue() == null) {
+          // null value
+          buffer.writeByte((byte) 0);
+        } else {
+          buffer.writeByte((byte) 1);
+          ReadWriteIOUtils.write(entry.getValue().size(), buffer);
+          for (Pair<Long, Boolean> pair : entry.getValue()) {
+            ReadWriteIOUtils.write(pair.left, buffer);
+            ReadWriteIOUtils.write(pair.right, buffer);
+          }
+        }
+      }
+    }
+  }
+
   @Override
   public void deserialize(ByteBuffer buffer) {
     byte isSeqFlag = buffer.get();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MNodePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MNodePlan.java
index e5c5140..0237492 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MNodePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MNodePlan.java
@@ -19,16 +19,16 @@
 
 package org.apache.iotdb.db.qp.physical.sys;
 
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
 public class MNodePlan extends PhysicalPlan {
   protected String name;
@@ -78,6 +78,14 @@ public class MNodePlan extends PhysicalPlan {
   }
 
   @Override
+  public void serialize(ByteBuf buffer) {
+    buffer.writeByte((byte) PhysicalPlanType.MNODE.ordinal());
+    putString(buffer, name);
+    buffer.writeInt(childSize);
+    buffer.writeLong(index);
+  }
+
+  @Override
   public void serialize(DataOutputStream stream) throws IOException {
     stream.write((byte) PhysicalPlanType.MNODE.ordinal());
     putString(stream, name);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MeasurementMNodePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MeasurementMNodePlan.java
index c69f53f..713a4d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MeasurementMNodePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MeasurementMNodePlan.java
@@ -18,16 +18,17 @@
  */
 
 package org.apache.iotdb.db.qp.physical.sys;
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 public class MeasurementMNodePlan extends MNodePlan {
   private MeasurementSchema schema;
@@ -67,6 +68,19 @@ public class MeasurementMNodePlan extends MNodePlan {
   }
 
   @Override
+  public void serialize(ByteBuf buffer) {
+    buffer.writeByte((byte) PhysicalPlanType.MEASUREMENT_MNODE.ordinal());
+
+    putString(buffer, name);
+    putString(buffer, alias);
+    buffer.writeLong(offset);
+    buffer.writeInt(childSize);
+    schema.serializeTo(buffer);
+
+    buffer.writeLong(index);
+  }
+
+  @Override
   public void serialize(DataOutputStream stream) throws IOException {
     stream.write((byte) PhysicalPlanType.MEASUREMENT_MNODE.ordinal());
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetStorageGroupPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetStorageGroupPlan.java
index a7a6e18..1f17489 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetStorageGroupPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetStorageGroupPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.sys;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -70,6 +71,13 @@ public class SetStorageGroupPlan extends PhysicalPlan {
   }
 
   @Override
+  public void serialize(ByteBuf buffer) {
+    buffer.writeByte((byte) PhysicalPlanType.SET_STORAGE_GROUP.ordinal());
+    putString(buffer, path.getFullPath());
+    buffer.writeLong(index);
+  }
+
+  @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
     path = new PartialPath(readString(buffer));
     this.index = buffer.getLong();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetTTLPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetTTLPlan.java
index deea2db..8001846 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetTTLPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetTTLPlan.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.qp.physical.sys;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -76,6 +77,16 @@ public class SetTTLPlan extends PhysicalPlan {
   }
 
   @Override
+  public void serialize(ByteBuf buffer) {
+    int type = PhysicalPlanType.TTL.ordinal();
+    buffer.writeByte((byte) type);
+    buffer.writeLong(dataTTL);
+    putString(buffer, storageGroup.getFullPath());
+
+    buffer.writeLong(index);
+  }
+
+  @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
     this.dataTTL = buffer.getLong();
     this.storageGroup = new PartialPath(readString(buffer));
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StorageGroupMNodePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StorageGroupMNodePlan.java
index 451475b..f71d8bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StorageGroupMNodePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StorageGroupMNodePlan.java
@@ -19,15 +19,15 @@
 
 package org.apache.iotdb.db.qp.physical.sys;
 
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
 
 public class StorageGroupMNodePlan extends MNodePlan {
   private long dataTTL;
@@ -67,6 +67,16 @@ public class StorageGroupMNodePlan extends MNodePlan {
   }
 
   @Override
+  public void serialize(ByteBuf buffer) {
+    buffer.writeByte((byte) PhysicalPlanType.STORAGE_GROUP_MNODE.ordinal());
+    putString(buffer, name);
+    buffer.writeLong(dataTTL);
+    buffer.writeInt(childSize);
+
+    buffer.writeLong(index);
+  }
+
+  @Override
   public void serialize(DataOutputStream stream) throws IOException {
     stream.write((byte) PhysicalPlanType.STORAGE_GROUP_MNODE.ordinal());
     putString(stream, name);
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java b/server/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java
index 07ac023..02b3663 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.writelog.io;
 
+import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -38,6 +39,8 @@ public interface ILogWriter {
    */
   void write(ByteBuffer logBuffer) throws IOException;
 
+  void write(ByteBuf logBuffer) throws IOException;
+
   /**
    * force the OS/FileSystem to flush its cache to make sure logs are persisted.
    * @throws IOException
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java b/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
index 1b21dfe..3a58835 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.writelog.io;
 
+import io.netty.buffer.ByteBuf;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
@@ -30,7 +31,6 @@ import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import sun.nio.ch.DirectBuffer;
 
 /**
  * LogWriter writes the binary logs into a file using FileChannel together with check sums of
@@ -43,8 +43,8 @@ public class LogWriter implements ILogWriter {
   private FileOutputStream fileOutputStream;
   private FileChannel channel;
   private final CRC32 checkSummer = new CRC32();
-  private final ByteBuffer lengthBuffer = ByteBuffer.allocateDirect(4);
-  private final ByteBuffer checkSumBuffer = ByteBuffer.allocateDirect(8);
+  private final ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
+  private final ByteBuffer checkSumBuffer = ByteBuffer.allocate(8);
   private final boolean forceEachWrite;
 
   /**
@@ -106,6 +106,39 @@ public class LogWriter implements ILogWriter {
   }
 
   @Override
+  public void write(ByteBuf logBuffer) throws IOException {
+    if (channel == null) {
+      fileOutputStream = new FileOutputStream(logFile, true);
+      channel = fileOutputStream.getChannel();
+    }
+    int logSize = logBuffer.readableBytes();
+    // 4 bytes size and 8 bytes check sum
+
+    checkSummer.reset();
+    checkSummer.update(logBuffer.nioBuffer());
+    long checkSum = checkSummer.getValue();
+
+    lengthBuffer.clear();
+    checkSumBuffer.clear();
+    lengthBuffer.putInt(logSize);
+    checkSumBuffer.putLong(checkSum);
+    lengthBuffer.flip();
+    checkSumBuffer.flip();
+
+    try {
+      channel.write(lengthBuffer);
+      logBuffer.getBytes(0, channel, channel.position(), logBuffer.readableBytes());
+      channel.write(checkSumBuffer);
+
+      if (this.forceEachWrite) {
+        channel.force(true);
+      }
+    } catch (ClosedChannelException ignored) {
+      logger.warn("someone interrupt current thread, so no need to do write for io safety");
+    }
+  }
+
+  @Override
   public void force() throws IOException {
     if (channel != null && channel.isOpen()) {
       channel.force(true);
@@ -114,25 +147,15 @@ public class LogWriter implements ILogWriter {
 
   @Override
   public void close() throws IOException {
-    try {
-      if (channel != null) {
-        if (channel.isOpen()) {
-          channel.force(true);
-        }
-        fileOutputStream.close();
-        fileOutputStream = null;
-        channel.close();
-        channel = null;
-      }
-    } finally {
-      if (lengthBuffer.isDirect()) {
-        ((DirectBuffer) lengthBuffer).cleaner().clean();
-      }
-      if (checkSumBuffer.isDirect()) {
-        ((DirectBuffer) checkSumBuffer).cleaner().clean();
+    if (channel != null) {
+      if (channel.isOpen()) {
+        channel.force(true);
       }
+      fileOutputStream.close();
+      fileOutputStream = null;
+      channel.close();
+      channel = null;
     }
-
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index de41479..64c9194 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -19,11 +19,11 @@
 package org.apache.iotdb.db.writelog.node;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -42,7 +42,6 @@ import org.apache.iotdb.db.writelog.io.LogWriter;
 import org.apache.iotdb.db.writelog.io.MultiFileLogReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import sun.nio.ch.DirectBuffer;
 
 /**
  * This WriteLogNode is used to manage insert ahead logs of a TsFile.
@@ -51,6 +50,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   public static final String WAL_FILE_NAME = "wal";
   private static final Logger logger = LoggerFactory.getLogger(ExclusiveWriteLogNode.class);
+  private static final PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
 
   private String identifier;
 
@@ -60,11 +60,12 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  private ByteBuffer logBufferWorking = ByteBuffer
-      .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
-  private ByteBuffer logBufferIdle = ByteBuffer
-      .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
-  private ByteBuffer logBufferFlushing;
+  private final int WAL_BUFFER_THRESHOLD =
+      IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2;
+
+  private ByteBuf logBufferWorking = allocator.directBuffer(WAL_BUFFER_THRESHOLD);
+  private ByteBuf logBufferIdle = allocator.directBuffer(WAL_BUFFER_THRESHOLD);
+  private ByteBuf logBufferFlushing;
 
   private final Object switchBufferCondition = new Object();
   private ReentrantLock lock = new ReentrantLock();
@@ -104,23 +105,16 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
       if (bufferedLogNum >= config.getFlushWalThreshold()) {
         sync();
       }
-    } catch (BufferOverflowException e) {
-      throw new IOException(
-          "Log cannot fit into the buffer, please increase wal_buffer_size", e);
     } finally {
       lock.unlock();
     }
   }
 
   private void putLog(PhysicalPlan plan) {
-    logBufferWorking.mark();
-    try {
-      plan.serialize(logBufferWorking);
-    } catch (BufferOverflowException e) {
-      logger.info("WAL BufferOverflow !");
-      logBufferWorking.reset();
+    logBufferWorking.markWriterIndex();
+    plan.serialize(logBufferWorking);
+    if (logBufferWorking.readableBytes() >= WAL_BUFFER_THRESHOLD) {
       sync();
-      plan.serialize(logBufferWorking);
     }
     bufferedLogNum++;
   }
@@ -150,13 +144,6 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
       Thread.currentThread().interrupt();
       logger.warn("Waiting for current buffer being flushed interrupted");
     } finally {
-      logBufferFlushing = null;
-      if (logBufferWorking.isDirect()) {
-        ((DirectBuffer) logBufferWorking).cleaner().clean();
-      }
-      if (logBufferIdle.isDirect()) {
-        ((DirectBuffer) logBufferIdle).cleaner().clean();
-      }
       lock.unlock();
     }
   }
@@ -212,6 +199,15 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
       FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(logDirectory));
       deleted = true;
     } finally {
+      if (logBufferFlushing != null && logBufferFlushing.refCnt() != 0) {
+        logBufferFlushing.release();
+      }
+      if (logBufferIdle != null && logBufferIdle.refCnt() != 0) {
+        logBufferIdle.release();
+      }
+      if (logBufferWorking != null && logBufferWorking.refCnt() != 0) {
+        logBufferWorking.release();
+      }
       lock.unlock();
     }
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index fe5610c..4566ae9 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -239,7 +239,7 @@ public class WriteLogNodeTest {
     } catch (IOException e) {
       caught = true;
     }
-    assertTrue(caught);
+    assertFalse(caught);
 
     logNode.delete();
   }
diff --git a/tsfile/pom.xml b/tsfile/pom.xml
index e05e38a..3418bbb 100644
--- a/tsfile/pom.xml
+++ b/tsfile/pom.xml
@@ -58,6 +58,11 @@
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-buffer</artifactId>
+            <version>4.1.22.Final</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
index 505be1b..ce99ff8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.file.metadata.enums;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -86,6 +87,10 @@ public enum TSDataType {
     byteBuffer.putShort(serialize());
   }
 
+  public void serializeTo(ByteBuf byteBuffer) {
+    byteBuffer.writeShort(serialize());
+  }
+
   public void serializeTo(DataOutputStream outputStream) throws IOException {
     outputStream.writeShort(serialize());
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index d4b7570..ce6ba0e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -28,6 +28,7 @@ import static org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.LO
 import static org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.NULL;
 import static org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.STRING;
 
+import io.netty.buffer.ByteBuf;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -157,12 +158,38 @@ public class ReadWriteIOUtils {
     return length;
   }
 
+  public static int write(Map<String, String> map, ByteBuf buffer) {
+    int length = 0;
+    byte[] bytes;
+    buffer.writeInt(map.size());
+    length += 4;
+    for (Entry<String, String> entry : map.entrySet()) {
+      bytes = entry.getKey().getBytes();
+      buffer.writeInt(bytes.length);
+      length += 4;
+      buffer.writeBytes(bytes);
+      length += bytes.length;
+      bytes = entry.getValue().getBytes();
+      buffer.writeInt(bytes.length);
+      length += 4;
+      buffer.writeBytes(bytes);
+      length += bytes.length;
+    }
+    return length;
+  }
+
   public static void write(List<Map<String, String>> maps, ByteBuffer buffer) {
     for (Map<String, String> map : maps) {
       write(map, buffer);
     }
   }
 
+  public static void write(List<Map<String, String>> maps, ByteBuf buffer) {
+    for (Map<String, String> map : maps) {
+      write(map, buffer);
+    }
+  }
+
   /**
    * write a int value to outputStream according to flag. If flag is true, write 1, else write 0.
    */
@@ -190,6 +217,18 @@ public class ReadWriteIOUtils {
     return 1;
   }
 
+  public static int write(Boolean flag, ByteBuf buffer) {
+    byte a;
+    if (Boolean.TRUE.equals(flag)) {
+      a = 1;
+    } else {
+      a = 0;
+    }
+
+    buffer.writeByte(a);
+    return 1;
+  }
+
   /**
    * write a byte n.
    *
@@ -223,6 +262,11 @@ public class ReadWriteIOUtils {
     return Byte.BYTES;
   }
 
+  public static int write(byte n, ByteBuf buffer) {
+    buffer.writeByte(n);
+    return Byte.BYTES;
+  }
+
   /**
    * write a short n to byteBuffer.
    *
@@ -233,6 +277,11 @@ public class ReadWriteIOUtils {
     return SHORT_LEN;
   }
 
+  public static int write(short n, ByteBuf buffer) {
+    buffer.writeShort(n);
+    return SHORT_LEN;
+  }
+
   /**
    * write a short n to byteBuffer.
    *
@@ -244,6 +293,12 @@ public class ReadWriteIOUtils {
     return INT_LEN + n.getLength();
   }
 
+  public static int write(Binary n, ByteBuf buffer) {
+    buffer.writeInt(n.getLength());
+    buffer.writeBytes(n.getValues());
+    return INT_LEN + n.getLength();
+  }
+
   /**
    * write a int n to outputStream.
    *
@@ -275,6 +330,11 @@ public class ReadWriteIOUtils {
     return INT_LEN;
   }
 
+  public static int write(int n, ByteBuf buffer) {
+    buffer.writeInt(n);
+    return INT_LEN;
+  }
+
   /**
    * write a float n to outputStream.
    *
@@ -316,6 +376,11 @@ public class ReadWriteIOUtils {
     return LONG_LEN;
   }
 
+  public static int write(long n, ByteBuf buffer) {
+    buffer.writeLong(n);
+    return LONG_LEN;
+  }
+
   /**
    * write a float n to byteBuffer.
    */
@@ -324,6 +389,11 @@ public class ReadWriteIOUtils {
     return FLOAT_LEN;
   }
 
+  public static int write(float n, ByteBuf buffer) {
+    buffer.writeFloat(n);
+    return FLOAT_LEN;
+  }
+
   /**
    * write a double n to byteBuffer.
    */
@@ -332,6 +402,11 @@ public class ReadWriteIOUtils {
     return DOUBLE_LEN;
   }
 
+  public static int write(double n, ByteBuf buffer) {
+    buffer.writeDouble(n);
+    return DOUBLE_LEN;
+  }
+
 
   /**
    * write string to outputStream.
@@ -369,6 +444,18 @@ public class ReadWriteIOUtils {
     return len;
   }
 
+  public static int write(String s, ByteBuf buffer) {
+    if (s == null) {
+      return write(-1, buffer);
+    }
+    int len = 0;
+    byte[] bytes = s.getBytes();
+    len += write(bytes.length, buffer);
+    buffer.writeBytes(bytes);
+    len += bytes.length;
+    return len;
+  }
+
   /**
    * write byteBuffer.capacity and byteBuffer.array to outputStream.
    */
@@ -429,6 +516,11 @@ public class ReadWriteIOUtils {
     return write(n, buffer);
   }
 
+  public static int write(TSDataType dataType, ByteBuf buffer) {
+    short n = dataType.serialize();
+    return write(n, buffer);
+  }
+
   /**
    * TSEncoding.
    */
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
index 11030c8..78c288f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.write.schema;
 
+import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -269,6 +270,30 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
     return byteLen;
   }
 
+  public int serializeTo(ByteBuf buffer) {
+    int byteLen = 0;
+
+    byteLen += ReadWriteIOUtils.write(measurementId, buffer);
+
+    byteLen += ReadWriteIOUtils.write((short) type, buffer);
+
+    byteLen += ReadWriteIOUtils.write((short) encoding, buffer);
+
+    byteLen += ReadWriteIOUtils.write((short) compressor, buffer);
+
+    if (props == null) {
+      byteLen += ReadWriteIOUtils.write(0, buffer);
+    } else {
+      byteLen += ReadWriteIOUtils.write(props.size(), buffer);
+      for (Map.Entry<String, String> entry : props.entrySet()) {
+        byteLen += ReadWriteIOUtils.write(entry.getKey(), buffer);
+        byteLen += ReadWriteIOUtils.write(entry.getValue(), buffer);
+      }
+    }
+
+    return byteLen;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {