You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/14 10:56:37 UTC

[iotdb] branch master updated: [IOTDB-2863] Serialize and deserialize of insert node (#5517)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d1278f8507 [IOTDB-2863] Serialize and deserialize of insert node (#5517)
d1278f8507 is described below

commit d1278f85074b961caa37bbf84dfd6b1b8ede4c49
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Thu Apr 14 18:56:31 2022 +0800

    [IOTDB-2863] Serialize and deserialize of insert node (#5517)
---
 .../db/mpp/sql/planner/plan/node/PlanNodeId.java   |  16 +
 .../db/mpp/sql/planner/plan/node/PlanNodeType.java |  16 +
 .../plan/node/write/InsertMultiTabletsNode.java    |  15 +
 .../sql/planner/plan/node/write/InsertNode.java    |  50 ++-
 .../sql/planner/plan/node/write/InsertRowNode.java | 344 ++++++++++++++-
 .../planner/plan/node/write/InsertRowsNode.java    |  16 +
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |  16 +
 .../planner/plan/node/write/InsertTabletNode.java  | 473 ++++++++++++++++++++-
 .../sql/statement/crud/InsertTabletStatement.java  |   2 +-
 .../org/apache/iotdb/db/wal/buffer/WALEntry.java   |   5 +-
 .../plan/node/write/InsertRowNodeSerdeTest.java    | 126 ++++++
 .../plan/node/write/InsertTabletNodeSerdeTest.java |  94 ++++
 12 files changed, 1155 insertions(+), 18 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
index 58519e303d..f0e7533e03 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
@@ -18,8 +18,12 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
+import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.wal.utils.WALWriteUtils;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.DataInputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class PlanNodeId {
@@ -58,4 +62,16 @@ public class PlanNodeId {
   public void serialize(ByteBuffer byteBuffer) {
     ReadWriteIOUtils.write(id, byteBuffer);
   }
+
+  public int serializedSize() {
+    return ReadWriteIOUtils.sizeToWrite(id);
+  }
+
+  public void serializeToWAL(IWALByteBufferView buffer) {
+    WALWriteUtils.write(id, buffer);
+  }
+
+  public static PlanNodeId deserialize(DataInputStream stream) throws IOException {
+    return new PlanNodeId(ReadWriteIOUtils.readString(stream));
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
index 4e2400a836..99b5d4fe35 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeType.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaFetchNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.ShowDevicesNode;
@@ -46,6 +47,8 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
 
+import java.io.DataInputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public enum PlanNodeType {
@@ -88,6 +91,19 @@ public enum PlanNodeType {
     buffer.putShort(nodeType);
   }
 
+  public static PlanNode deserialize(DataInputStream stream)
+      throws IOException, IllegalPathException {
+    short nodeType = stream.readShort();
+    switch (nodeType) {
+      case 13:
+        return InsertTabletNode.deserialize(stream);
+      case 14:
+        return InsertRowNode.deserialize(stream);
+      default:
+        throw new IllegalArgumentException("Invalid node type: " + nodeType);
+    }
+  }
+
   public static PlanNode deserialize(ByteBuffer buffer) {
     short nodeType = buffer.getShort();
     switch (nodeType) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index 946fa9f7f2..3e2fd699d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -162,4 +162,19 @@ public class InsertMultiTabletsNode extends InsertNode {
 
   @Override
   public void serialize(ByteBuffer byteBuffer) {}
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    InsertMultiTabletsNode that = (InsertMultiTabletsNode) o;
+    return Objects.equals(parentInsertTabletNodeIndexList, that.parentInsertTabletNodeIndexList)
+        && Objects.equals(insertTabletNodeList, that.insertTabletNodeList);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), parentInsertTabletNodeIndexList, insertTabletNodeList);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index 6856f2c0dc..08c861913d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
 
 public abstract class InsertNode extends WritePlanNode {
 
@@ -68,6 +70,13 @@ public abstract class InsertNode extends WritePlanNode {
     this.isAligned = isAligned;
     this.measurementSchemas = measurementSchemas;
     this.dataTypes = dataTypes;
+
+    this.measurements = new String[measurementSchemas.length];
+    for (int i = 0; i < measurementSchemas.length; i++) {
+      if (measurementSchemas[i] != null) {
+        measurements[i] = measurementSchemas[i].getMeasurementId();
+      }
+    }
   }
 
   public RegionReplicaSet getDataRegionReplicaSet() {
@@ -103,12 +112,6 @@ public abstract class InsertNode extends WritePlanNode {
   }
 
   public String[] getMeasurements() {
-    if (measurements == null) {
-      measurements = new String[measurementSchemas.length];
-      for (int i = 0; i < measurementSchemas.length; i++) {
-        measurements[i] = measurementSchemas[i].getMeasurementId();
-      }
-    }
     return measurements;
   }
 
@@ -136,4 +139,39 @@ public abstract class InsertNode extends WritePlanNode {
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     throw new NotImplementedException("serializeAttributes of InsertNode is not implemented");
   }
+
+  protected int countFailedMeasurements() {
+    int result = 0;
+    for (MeasurementSchema measurement : measurementSchemas) {
+      if (measurement == null) {
+        result++;
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    InsertNode that = (InsertNode) o;
+    return isAligned == that.isAligned
+        && Objects.equals(devicePath, that.devicePath)
+        && Arrays.equals(measurementSchemas, that.measurementSchemas)
+        && Arrays.equals(measurements, that.measurements)
+        && Arrays.equals(dataTypes, that.dataTypes)
+        && Objects.equals(deviceID, that.deviceID)
+        && Objects.equals(dataRegionReplicaSet, that.dataRegionReplicaSet);
+  }
+
+  @Override
+  public int hashCode() {
+    int result =
+        Objects.hash(super.hashCode(), devicePath, isAligned, deviceID, dataRegionReplicaSet);
+    result = 31 * result + Arrays.hashCode(measurementSchemas);
+    result = 31 * result + Arrays.hashCode(measurements);
+    result = 31 * result + Arrays.hashCode(dataTypes);
+    return result;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index 1d37b01626..2b3481fd74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -20,23 +20,40 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 
 import org.apache.iotdb.commons.partition.TimePartitionSlot;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.wal.buffer.WALEntryValue;
+import org.apache.iotdb.db.wal.utils.WALWriteUtils;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 public class InsertRowNode extends InsertNode implements WALEntryValue {
 
+  private static final Logger logger = LoggerFactory.getLogger(InsertRowNode.class);
+
+  private static final byte TYPE_NULL = -2;
+
   private long time;
   private Object[] values;
 
@@ -87,14 +104,199 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
 
   @Override
   public int serializedSize() {
-    return 0;
+    int size = 0;
+    size += Short.BYTES;
+    size += this.getPlanNodeId().serializedSize();
+    return size + subSerializeSize();
+  }
+
+  int subSerializeSize() {
+    int size = 0;
+    size += Long.BYTES;
+    size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
+    return size + serializeMeasurementsAndValuesSize();
+  }
+
+  int serializeMeasurementsAndValuesSize() {
+    int size = 0;
+    size += Integer.BYTES;
+    for (String m : measurements) {
+      if (m != null) {
+        size += ReadWriteIOUtils.sizeToWrite(m);
+      }
+    }
+
+    // putValues
+    for (int i = 0; i < values.length; i++) {
+      if (dataTypes[i] != null) {
+        if (values[i] == null) {
+          size += Byte.BYTES;
+          continue;
+        }
+        size += Byte.BYTES;
+        switch (dataTypes[i]) {
+          case BOOLEAN:
+            size += Byte.BYTES;
+            break;
+          case INT32:
+            size += Integer.BYTES;
+            break;
+          case INT64:
+            size += Long.BYTES;
+            break;
+          case FLOAT:
+            size += Float.BYTES;
+            break;
+          case DOUBLE:
+            size += Double.BYTES;
+            break;
+          case TEXT:
+            size += ReadWriteIOUtils.sizeToWrite((Binary) values[i]);
+            break;
+        }
+      }
+    }
+
+    size += Byte.BYTES;
+    return size;
   }
 
   @Override
-  public void serialize(ByteBuffer byteBuffer) {}
+  public int hashCode() {
+    int result = Objects.hash(super.hashCode(), time);
+    result = 31 * result + Arrays.hashCode(values);
+    return result;
+  }
 
   @Override
-  public void serializeToWAL(IWALByteBufferView buffer) {}
+  public void serialize(ByteBuffer byteBuffer) {
+    byteBuffer.putShort((short) PlanNodeType.INSERT_ROW.ordinal());
+    getPlanNodeId().serialize(byteBuffer);
+    subSerialize(byteBuffer);
+  }
+
+  void subSerialize(ByteBuffer buffer) {
+    buffer.putLong(time);
+    ReadWriteIOUtils.write(devicePath.getFullPath(), buffer);
+    serializeMeasurementsAndValues(buffer);
+  }
+
+  void serializeMeasurementsAndValues(ByteBuffer buffer) {
+    buffer.putInt(measurementSchemas.length - countFailedMeasurements());
+
+    for (MeasurementSchema measurement : measurementSchemas) {
+      if (measurement != null) {
+        measurement.serializeTo(buffer);
+      }
+    }
+
+    try {
+      putValues(buffer);
+    } catch (QueryProcessException e) {
+      logger.error("Failed to serialize values for {}", this, e);
+    }
+
+    buffer.put((byte) (isAligned ? 1 : 0));
+  }
+
+  private void putValues(ByteBuffer buffer) throws QueryProcessException {
+    for (int i = 0; i < values.length; i++) {
+      if (dataTypes[i] != null) {
+        if (values[i] == null) {
+          ReadWriteIOUtils.write(TYPE_NULL, buffer);
+          continue;
+        }
+        ReadWriteIOUtils.write(dataTypes[i], buffer);
+        switch (dataTypes[i]) {
+          case BOOLEAN:
+            ReadWriteIOUtils.write((Boolean) values[i], buffer);
+            break;
+          case INT32:
+            ReadWriteIOUtils.write((Integer) values[i], buffer);
+            break;
+          case INT64:
+            ReadWriteIOUtils.write((Long) values[i], buffer);
+            break;
+          case FLOAT:
+            ReadWriteIOUtils.write((Float) values[i], buffer);
+            break;
+          case DOUBLE:
+            ReadWriteIOUtils.write((Double) values[i], buffer);
+            break;
+          case TEXT:
+            ReadWriteIOUtils.write((Binary) values[i], buffer);
+            break;
+          default:
+            throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void serializeToWAL(IWALByteBufferView buffer) {
+    buffer.putShort((short) PlanNodeType.INSERT_ROW.ordinal());
+    getPlanNodeId().serializeToWAL(buffer);
+    subSerialize(buffer);
+  }
+
+  void subSerialize(IWALByteBufferView buffer) {
+    buffer.putLong(time);
+    WALWriteUtils.write(devicePath.getFullPath(), buffer);
+    serializeMeasurementsAndValues(buffer);
+  }
+
+  void serializeMeasurementsAndValues(IWALByteBufferView buffer) {
+    buffer.putInt(measurementSchemas.length - countFailedMeasurements());
+
+    for (String measurement : measurements) {
+      if (measurement != null) {
+        WALWriteUtils.write(measurement, buffer);
+      }
+    }
+
+    try {
+      putValues(buffer);
+    } catch (QueryProcessException e) {
+      logger.error("Failed to serialize values for {}", this, e);
+    }
+
+    buffer.put((byte) (isAligned ? 1 : 0));
+  }
+
+  private void putValues(IWALByteBufferView buffer) throws QueryProcessException {
+    for (int i = 0; i < values.length; i++) {
+      if (dataTypes[i] != null) {
+        if (values[i] == null) {
+          WALWriteUtils.write(TYPE_NULL, buffer);
+          continue;
+        }
+        WALWriteUtils.write(dataTypes[i], buffer);
+        switch (dataTypes[i]) {
+          case BOOLEAN:
+            WALWriteUtils.write((Boolean) values[i], buffer);
+            break;
+          case INT32:
+            WALWriteUtils.write((Integer) values[i], buffer);
+            break;
+          case INT64:
+            WALWriteUtils.write((Long) values[i], buffer);
+            break;
+          case FLOAT:
+            WALWriteUtils.write((Float) values[i], buffer);
+            break;
+          case DOUBLE:
+            WALWriteUtils.write((Double) values[i], buffer);
+            break;
+          case TEXT:
+            WALWriteUtils.write((Binary) values[i], buffer);
+            break;
+          default:
+            throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+        }
+      }
+    }
+  }
 
   public Object[] getValues() {
     return values;
@@ -113,6 +315,140 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   }
 
   public static InsertRowNode deserialize(ByteBuffer byteBuffer) {
-    return null;
+    InsertRowNode insertNode = new InsertRowNode(PlanNodeId.deserialize(byteBuffer));
+    insertNode.setTime(byteBuffer.getLong());
+    try {
+      insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(byteBuffer)));
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
+    }
+    insertNode.deserializeMeasurementsAndValues(byteBuffer);
+
+    return insertNode;
+  }
+
+  void deserializeMeasurementsAndValues(ByteBuffer buffer) {
+    int measurementSize = buffer.getInt();
+
+    this.measurements = new String[measurementSize];
+    this.measurementSchemas = new MeasurementSchema[measurementSize];
+    for (int i = 0; i < measurementSize; i++) {
+      measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
+      measurements[i] = measurementSchemas[i].getMeasurementId();
+    }
+
+    this.dataTypes = new TSDataType[measurementSize];
+    this.values = new Object[measurementSize];
+    try {
+      fillValues(buffer);
+    } catch (QueryProcessException e) {
+      e.printStackTrace();
+    }
+
+    isAligned = buffer.get() == 1;
+  }
+
+  /** Make sure the values is already inited before calling this */
+  public void fillValues(ByteBuffer buffer) throws QueryProcessException {
+    for (int i = 0; i < dataTypes.length; i++) {
+      byte typeNum = (byte) ReadWriteIOUtils.read(buffer);
+      if (typeNum == TYPE_NULL) {
+        continue;
+      }
+      dataTypes[i] = TSDataType.values()[typeNum];
+      switch (dataTypes[i]) {
+        case BOOLEAN:
+          values[i] = ReadWriteIOUtils.readBool(buffer);
+          break;
+        case INT32:
+          values[i] = ReadWriteIOUtils.readInt(buffer);
+          break;
+        case INT64:
+          values[i] = ReadWriteIOUtils.readLong(buffer);
+          break;
+        case FLOAT:
+          values[i] = ReadWriteIOUtils.readFloat(buffer);
+          break;
+        case DOUBLE:
+          values[i] = ReadWriteIOUtils.readDouble(buffer);
+          break;
+        case TEXT:
+          values[i] = ReadWriteIOUtils.readBinary(buffer);
+          break;
+        default:
+          throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+      }
+    }
+  }
+
+  public static InsertRowNode deserialize(DataInputStream stream)
+      throws IOException, IllegalPathException {
+    InsertRowNode insertNode = new InsertRowNode(PlanNodeId.deserialize(stream));
+    insertNode.setTime(stream.readLong());
+    insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(stream)));
+    insertNode.deserializeMeasurementsAndValues(stream);
+
+    return insertNode;
+  }
+
+  void deserializeMeasurementsAndValues(DataInputStream stream) throws IOException {
+    int measurementSize = stream.readInt();
+
+    this.measurements = new String[measurementSize];
+    for (int i = 0; i < measurementSize; i++) {
+      measurements[i] = ReadWriteIOUtils.readString(stream);
+    }
+
+    this.dataTypes = new TSDataType[measurementSize];
+    this.values = new Object[measurementSize];
+    try {
+      fillValues(stream);
+    } catch (QueryProcessException e) {
+      e.printStackTrace();
+    }
+
+    isAligned = stream.readByte() == 1;
+  }
+
+  /** Make sure the values is already inited before calling this */
+  public void fillValues(DataInputStream stream) throws QueryProcessException, IOException {
+    for (int i = 0; i < dataTypes.length; i++) {
+      byte typeNum = stream.readByte();
+      if (typeNum == TYPE_NULL) {
+        continue;
+      }
+      dataTypes[i] = TSDataType.values()[typeNum];
+      switch (dataTypes[i]) {
+        case BOOLEAN:
+          values[i] = ReadWriteIOUtils.readBool(stream);
+          break;
+        case INT32:
+          values[i] = ReadWriteIOUtils.readInt(stream);
+          break;
+        case INT64:
+          values[i] = ReadWriteIOUtils.readLong(stream);
+          break;
+        case FLOAT:
+          values[i] = ReadWriteIOUtils.readFloat(stream);
+          break;
+        case DOUBLE:
+          values[i] = ReadWriteIOUtils.readDouble(stream);
+          break;
+        case TEXT:
+          values[i] = ReadWriteIOUtils.readBinary(stream);
+          break;
+        default:
+          throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+      }
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    InsertRowNode that = (InsertRowNode) o;
+    return time == that.time && Arrays.equals(values, that.values);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index cd80ae45cf..b82370ecfa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 public class InsertRowsNode extends InsertNode {
 
@@ -97,6 +98,21 @@ public class InsertRowsNode extends InsertNode {
   @Override
   public void addChild(PlanNode child) {}
 
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    InsertRowsNode that = (InsertRowsNode) o;
+    return Objects.equals(insertRowNodeIndexList, that.insertRowNodeIndexList)
+        && Objects.equals(insertRowNodeList, that.insertRowNodeList);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), insertRowNodeIndexList, insertRowNodeList);
+  }
+
   @Override
   public PlanNode clone() {
     throw new NotImplementedException("clone of Insert is not implemented");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 34d4e81c63..a20beb3876 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 public class InsertRowsOfOneDeviceNode extends InsertNode {
 
@@ -135,4 +136,19 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
 
     return new ArrayList<>(splitMap.values());
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    InsertRowsOfOneDeviceNode that = (InsertRowsOfOneDeviceNode) o;
+    return Objects.equals(insertRowNodeIndexList, that.insertRowNodeIndexList)
+        && Objects.equals(insertRowNodeList, that.insertRowNodeList);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), insertRowNodeIndexList, insertRowNodeList);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index 2d516f1e47..f53e716680 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -21,24 +21,35 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.commons.partition.TimePartitionSlot;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.wal.buffer.WALEntryValue;
+import org.apache.iotdb.db.wal.utils.WALWriteUtils;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import java.io.DataInputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
 public class InsertTabletNode extends InsertNode implements WALEntryValue {
 
+  private static final String DATATYPE_UNSUPPORTED = "Data type %s is not supported.";
+
   private long[] times; // times should be sorted. It is done in the session API.
 
   private BitMap[] bitMaps;
@@ -138,16 +149,397 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
 
   @Override
   public int serializedSize() {
-    return 0;
+    return serializedSize(0, rowCount);
+  }
+
+  public int serializedSize(int start, int end) {
+    int size = 0;
+    size += Short.BYTES;
+    size += this.getPlanNodeId().serializedSize();
+    return size + subSerializeSize(start, end);
+  }
+
+  int subSerializeSize(int start, int end) {
+    int size = 0;
+    size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
+    // measurements size
+    size += Integer.BYTES;
+    for (String m : measurements) {
+      if (m != null) {
+        size += ReadWriteIOUtils.sizeToWrite(m);
+      }
+    }
+    // data types size
+    size += Integer.BYTES;
+    for (int i = 0; i < dataTypes.length; i++) {
+      if (measurements[i] != null) {
+        size += Byte.BYTES;
+      }
+    }
+    // times size
+    size += Integer.BYTES;
+    size += Long.BYTES * (end - start);
+    // bitmaps size
+    size += Byte.BYTES;
+    if (bitMaps != null) {
+      for (BitMap bitMap : bitMaps) {
+        size += Byte.BYTES;
+        if (bitMap != null) {
+          int len = end - start;
+          BitMap partBitMap = new BitMap(len);
+          BitMap.copyOfRange(bitMap, start, partBitMap, 0, len);
+          size += partBitMap.getByteArray().length;
+        }
+      }
+    }
+    // values size
+    for (int i = 0; i < dataTypes.length; i++) {
+      if (columns[i] != null) {
+        size += getColumnSize(dataTypes[i], columns[i], start, end);
+      }
+    }
+    size += Long.BYTES;
+    size += Byte.BYTES;
+    return size;
+  }
+
+  private int getColumnSize(TSDataType dataType, Object column, int start, int end) {
+    int size = 0;
+    switch (dataType) {
+      case INT32:
+        size += Integer.BYTES * (end - start);
+        break;
+      case INT64:
+        size += Long.BYTES * (end - start);
+        break;
+      case FLOAT:
+        size += Float.BYTES * (end - start);
+        break;
+      case DOUBLE:
+        size += Double.BYTES * (end - start);
+        break;
+      case BOOLEAN:
+        size += Byte.BYTES * (end - start);
+        break;
+      case TEXT:
+        Binary[] binaryValues = (Binary[]) column;
+        for (int j = start; j < end; j++) {
+          size += ReadWriteIOUtils.sizeToWrite(binaryValues[j]);
+        }
+        break;
+    }
+    return size;
+  }
+
+  @Override
+  public void serialize(ByteBuffer byteBuffer) {
+    byteBuffer.putShort((short) PlanNodeType.INSERT_TABLET.ordinal());
+    getPlanNodeId().serialize(byteBuffer);
+    subSerialize(byteBuffer);
+  }
+
+  void subSerialize(ByteBuffer buffer) {
+    ReadWriteIOUtils.write(devicePath.getFullPath(), buffer);
+    writeMeasurements(buffer);
+    writeDataTypes(buffer);
+    writeTimes(buffer);
+    writeBitMaps(buffer);
+    writeValues(buffer);
+    buffer.put((byte) (isAligned ? 1 : 0));
+  }
+
+  private void writeMeasurements(ByteBuffer buffer) {
+    buffer.putInt(measurementSchemas.length - countFailedMeasurements());
+    for (MeasurementSchema measurement : measurementSchemas) {
+      if (measurement != null) {
+        measurement.serializeTo(buffer);
+      }
+    }
+  }
+
+  private void writeDataTypes(ByteBuffer buffer) {
+    for (TSDataType dataType : dataTypes) {
+      if (dataType == null) {
+        continue;
+      }
+      dataType.serializeTo(buffer);
+    }
+  }
+
+  private void writeTimes(ByteBuffer buffer) {
+    buffer.putInt(rowCount);
+    for (long time : times) {
+      buffer.putLong(time);
+    }
+  }
+
+  private void writeBitMaps(ByteBuffer buffer) {
+    buffer.put(BytesUtils.boolToByte(bitMaps != null));
+    if (bitMaps != null) {
+      for (int i = 0; i < measurements.length; i++) {
+        // check failed measurement
+        if (measurements[i] != null) {
+          BitMap bitMap = bitMaps[i];
+          if (bitMap == null) {
+            buffer.put(BytesUtils.boolToByte(false));
+          } else {
+            buffer.put(BytesUtils.boolToByte(true));
+            buffer.put(bitMap.getByteArray());
+          }
+        }
+      }
+    }
+  }
+
+  private void writeValues(ByteBuffer buffer) {
+    for (int i = 0; i < dataTypes.length; i++) {
+      if (columns[i] == null) {
+        continue;
+      }
+      serializeColumn(dataTypes[i], columns[i], buffer);
+    }
+  }
+
+  private void serializeColumn(TSDataType dataType, Object column, ByteBuffer buffer) {
+    switch (dataType) {
+      case INT32:
+        int[] intValues = (int[]) column;
+        for (int j = 0; j < rowCount; j++) {
+          buffer.putInt(intValues[j]);
+        }
+        break;
+      case INT64:
+        long[] longValues = (long[]) column;
+        for (int j = 0; j < rowCount; j++) {
+          buffer.putLong(longValues[j]);
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = (float[]) column;
+        for (int j = 0; j < rowCount; j++) {
+          buffer.putFloat(floatValues[j]);
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = (double[]) column;
+        for (int j = 0; j < rowCount; j++) {
+          buffer.putDouble(doubleValues[j]);
+        }
+        break;
+      case BOOLEAN:
+        boolean[] boolValues = (boolean[]) column;
+        for (int j = 0; j < rowCount; j++) {
+          buffer.put(BytesUtils.boolToByte(boolValues[j]));
+        }
+        break;
+      case TEXT:
+        Binary[] binaryValues = (Binary[]) column;
+        for (int j = 0; j < rowCount; j++) {
+          buffer.putInt(binaryValues[j].getLength());
+          buffer.put(binaryValues[j].getValues());
+        }
+        break;
+      default:
+        throw new UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED, dataType));
+    }
+  }
+
+  @Override
+  public void serializeToWAL(IWALByteBufferView buffer) {
+    serializeToWAL(buffer, 0, rowCount);
+  }
+
+  public void serializeToWAL(IWALByteBufferView buffer, int start, int end) {
+    buffer.putShort((short) PlanNodeType.INSERT_TABLET.ordinal());
+    getPlanNodeId().serializeToWAL(buffer);
+    subSerialize(buffer, start, end);
+  }
+
+  void subSerialize(IWALByteBufferView buffer, int start, int end) {
+    WALWriteUtils.write(devicePath.getFullPath(), buffer);
+    writeMeasurements(buffer);
+    writeDataTypes(buffer);
+    writeTimes(buffer, start, end);
+    writeBitMaps(buffer, start, end);
+    writeValues(buffer, start, end);
+    buffer.put((byte) (isAligned ? 1 : 0));
+  }
+
+  private void writeMeasurements(IWALByteBufferView buffer) {
+    buffer.putInt(measurementSchemas.length - countFailedMeasurements());
+    for (String m : measurements) {
+      if (m != null) {
+        WALWriteUtils.write(m, buffer);
+      }
+    }
   }
 
   @Override
-  public void serialize(ByteBuffer byteBuffer) {}
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    InsertTabletNode that = (InsertTabletNode) o;
+    return rowCount == that.rowCount
+        && Arrays.equals(times, that.times)
+        && Arrays.equals(bitMaps, that.bitMaps)
+        && equals(that.columns)
+        && Objects.equals(range, that.range);
+  }
+
+  private boolean equals(Object[] columns) {
+    if (this.columns == columns) {
+      return true;
+    }
+
+    if (columns == null || this.columns == null || columns.length != this.columns.length) {
+      return false;
+    }
+
+    for (int i = 0; i < columns.length; i++) {
+      if (dataTypes[i] != null) {
+        switch (dataTypes[i]) {
+          case INT32:
+            if (!Arrays.equals((int[]) this.columns[i], (int[]) columns[i])) {
+              return false;
+            }
+            break;
+          case INT64:
+            if (!Arrays.equals((long[]) this.columns[i], (long[]) columns[i])) {
+              return false;
+            }
+            break;
+          case FLOAT:
+            if (!Arrays.equals((float[]) this.columns[i], (float[]) columns[i])) {
+              return false;
+            }
+            break;
+          case DOUBLE:
+            if (!Arrays.equals((double[]) this.columns[i], (double[]) columns[i])) {
+              return false;
+            }
+            break;
+          case BOOLEAN:
+            if (!Arrays.equals((boolean[]) this.columns[i], (boolean[]) columns[i])) {
+              return false;
+            }
+            break;
+          case TEXT:
+            if (!Arrays.equals((Binary[]) this.columns[i], (Binary[]) columns[i])) {
+              return false;
+            }
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                String.format(DATATYPE_UNSUPPORTED, dataTypes[i]));
+        }
+      } else if (!columns[i].equals(columns)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
 
   @Override
-  public void serializeToWAL(IWALByteBufferView buffer) {}
+  public int hashCode() {
+    int result = Objects.hash(super.hashCode(), rowCount, range);
+    result = 31 * result + Arrays.hashCode(times);
+    result = 31 * result + Arrays.hashCode(bitMaps);
+    result = 31 * result + Arrays.hashCode(columns);
+    return result;
+  }
+
+  private void writeDataTypes(IWALByteBufferView buffer) {
+    for (TSDataType dataType : dataTypes) {
+      if (dataType == null) {
+        continue;
+      }
+      WALWriteUtils.write(dataType, buffer);
+    }
+  }
+
+  private void writeTimes(IWALByteBufferView buffer, int start, int end) {
+    buffer.putInt(end - start);
+    for (int i = start; i < end; i++) {
+      buffer.putLong(times[i]);
+    }
+  }
+
+  private void writeBitMaps(IWALByteBufferView buffer, int start, int end) {
+    buffer.put(BytesUtils.boolToByte(bitMaps != null));
+    if (bitMaps != null) {
+      for (int i = 0; i < measurements.length; i++) {
+        // check failed measurement
+        if (measurements[i] != null) {
+          BitMap bitMap = bitMaps[i];
+          if (bitMap == null) {
+            buffer.put(BytesUtils.boolToByte(false));
+          } else {
+            buffer.put(BytesUtils.boolToByte(true));
+            int len = end - start;
+            BitMap partBitMap = new BitMap(len);
+            BitMap.copyOfRange(bitMap, start, partBitMap, 0, len);
+            buffer.put(partBitMap.getByteArray());
+          }
+        }
+      }
+    }
+  }
+
+  private void writeValues(IWALByteBufferView buffer, int start, int end) {
+    for (int i = 0; i < dataTypes.length; i++) {
+      if (columns[i] == null) {
+        continue;
+      }
+      serializeColumn(dataTypes[i], columns[i], buffer, start, end);
+    }
+  }
 
-  public void serializeToWAL(IWALByteBufferView buffer, int start, int end) {}
+  private void serializeColumn(
+      TSDataType dataType, Object column, IWALByteBufferView buffer, int start, int end) {
+    switch (dataType) {
+      case INT32:
+        int[] intValues = (int[]) column;
+        for (int j = start; j < end; j++) {
+          buffer.putInt(intValues[j]);
+        }
+        break;
+      case INT64:
+        long[] longValues = (long[]) column;
+        for (int j = start; j < end; j++) {
+          buffer.putLong(longValues[j]);
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = (float[]) column;
+        for (int j = start; j < end; j++) {
+          buffer.putFloat(floatValues[j]);
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = (double[]) column;
+        for (int j = start; j < end; j++) {
+          buffer.putDouble(doubleValues[j]);
+        }
+        break;
+      case BOOLEAN:
+        boolean[] boolValues = (boolean[]) column;
+        for (int j = start; j < end; j++) {
+          buffer.put(BytesUtils.boolToByte(boolValues[j]));
+        }
+        break;
+      case TEXT:
+        Binary[] binaryValues = (Binary[]) column;
+        for (int j = start; j < end; j++) {
+          buffer.putInt(binaryValues[j].getLength());
+          buffer.put(binaryValues[j].getValues());
+        }
+        break;
+      default:
+        throw new UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED, dataType));
+    }
+  }
 
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
@@ -281,6 +673,77 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
   }
 
   public static InsertTabletNode deserialize(ByteBuffer byteBuffer) {
-    return null;
+    InsertTabletNode insertNode = new InsertTabletNode(PlanNodeId.deserialize(byteBuffer));
+    try {
+      insertNode.subDeserialize(byteBuffer);
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
+    }
+    return insertNode;
+  }
+
+  private void subDeserialize(ByteBuffer buffer) throws IllegalPathException {
+    this.devicePath = new PartialPath(ReadWriteIOUtils.readString(buffer));
+
+    int measurementSize = buffer.getInt();
+    this.measurements = new String[measurementSize];
+    this.measurementSchemas = new MeasurementSchema[measurementSize];
+    for (int i = 0; i < measurementSize; i++) {
+      measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
+      measurements[i] = measurementSchemas[i].getMeasurementId();
+    }
+
+    this.dataTypes = new TSDataType[measurementSize];
+    for (int i = 0; i < measurementSize; i++) {
+      dataTypes[i] = TSDataType.deserialize(buffer.get());
+    }
+
+    int rows = buffer.getInt();
+    rowCount = rows;
+    this.times = new long[rows];
+    times = QueryDataSetUtils.readTimesFromBuffer(buffer, rows);
+
+    boolean hasBitMaps = BytesUtils.byteToBool(buffer.get());
+    if (hasBitMaps) {
+      bitMaps = QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize, rows);
+    }
+    columns =
+        QueryDataSetUtils.readTabletValuesFromBuffer(buffer, dataTypes, measurementSize, rows);
+    this.isAligned = buffer.get() == 1;
+  }
+
+  public static InsertTabletNode deserialize(DataInputStream stream)
+      throws IllegalPathException, IOException {
+    InsertTabletNode insertNode = new InsertTabletNode(PlanNodeId.deserialize(stream));
+    insertNode.subDeserialize(stream);
+    return insertNode;
+  }
+
+  private void subDeserialize(DataInputStream stream) throws IllegalPathException, IOException {
+    this.devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
+
+    int measurementSize = stream.readInt();
+    this.measurements = new String[measurementSize];
+    for (int i = 0; i < measurementSize; i++) {
+      measurements[i] = ReadWriteIOUtils.readString(stream);
+    }
+
+    this.dataTypes = new TSDataType[measurementSize];
+    for (int i = 0; i < measurementSize; i++) {
+      dataTypes[i] = TSDataType.deserialize(stream.readByte());
+    }
+
+    int rows = stream.readInt();
+    rowCount = rows;
+    this.times = new long[rows];
+    times = QueryDataSetUtils.readTimesFromStream(stream, rows);
+
+    boolean hasBitMaps = BytesUtils.byteToBool(stream.readByte());
+    if (hasBitMaps) {
+      bitMaps = QueryDataSetUtils.readBitMapsFromStream(stream, measurementSize, rows);
+    }
+    columns =
+        QueryDataSetUtils.readTabletValuesFromStream(stream, dataTypes, measurementSize, rows);
+    this.isAligned = stream.readByte() == 1;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
index 9755950521..23d740315e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
@@ -78,8 +78,8 @@ public class InsertTabletStatement extends InsertBaseStatement {
     }
     super.markFailedMeasurementInsertion(index, e);
     dataTypes[index] = null;
-    times[index] = Long.MAX_VALUE;
     columns[index] = null;
+    bitMaps[index] = null;
   }
 
   public List<TimePartitionSlot> getTimePartitionSlots() {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
index 705916eeae..0a8da8ce8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -159,10 +160,10 @@ public class WALEntry implements SerializedSize {
         value = AbstractMemTable.Factory.create(stream);
         break;
       case INSERT_ROW_NODE:
-        // TODO
+        value = (InsertRowNode) PlanNodeType.deserialize(stream);
         break;
       case INSERT_TABLET_NODE:
-        // TODO
+        value = (InsertTabletNode) PlanNodeType.deserialize(stream);
         break;
     }
     return new WALEntry(type, memTableId, value);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowNodeSerdeTest.java
new file mode 100644
index 0000000000..ccbe06f31e
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowNodeSerdeTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan.node.write;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class InsertRowNodeSerdeTest {
+
+  @Test
+  public void TestSerializeAndDeserialize() throws IllegalPathException {
+    InsertRowNode insertRowNode = getInsertRowNode();
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
+    insertRowNode.serialize(byteBuffer);
+    byteBuffer.flip();
+
+    Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), byteBuffer.getShort());
+
+    Assert.assertEquals(InsertRowNode.deserialize(byteBuffer), insertRowNode);
+
+    // Test with failed column
+    insertRowNode = getInsertRowNodeWithFailedColumn();
+    byteBuffer = ByteBuffer.allocate(10000);
+    insertRowNode.serialize(byteBuffer);
+    byteBuffer.flip();
+
+    Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), byteBuffer.getShort());
+
+    InsertRowNode tmpNode = InsertRowNode.deserialize(byteBuffer);
+
+    Assert.assertEquals(tmpNode.getTime(), insertRowNode.getTime());
+    Assert.assertEquals(tmpNode.getMeasurements(), new String[] {"s1", "s3", "s5"});
+  }
+
+  private InsertRowNode getInsertRowNode() throws IllegalPathException {
+    long time = 110L;
+    TSDataType[] dataTypes =
+        new TSDataType[] {
+          TSDataType.DOUBLE,
+          TSDataType.FLOAT,
+          TSDataType.INT64,
+          TSDataType.INT32,
+          TSDataType.BOOLEAN,
+        };
+
+    Object[] columns = new Object[5];
+    columns[0] = 1.0;
+    columns[1] = 2.0f;
+    columns[2] = 10000l;
+    columns[3] = 100;
+    columns[4] = false;
+
+    return new InsertRowNode(
+        new PlanNodeId("plannode 1"),
+        new PartialPath("root.isp.d1"),
+        false,
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.DOUBLE),
+          new MeasurementSchema("s2", TSDataType.FLOAT),
+          new MeasurementSchema("s3", TSDataType.INT64),
+          new MeasurementSchema("s4", TSDataType.INT32),
+          new MeasurementSchema("s5", TSDataType.BOOLEAN)
+        },
+        dataTypes,
+        time,
+        columns);
+  }
+
+  private InsertRowNode getInsertRowNodeWithFailedColumn() throws IllegalPathException {
+    long time = 110L;
+    TSDataType[] dataTypes =
+        new TSDataType[] {
+          TSDataType.DOUBLE, null, TSDataType.INT64, null, TSDataType.BOOLEAN,
+        };
+
+    Object[] columns = new Object[5];
+    columns[0] = 1.0;
+    columns[1] = null;
+    columns[2] = 10000l;
+    columns[3] = null;
+    columns[4] = false;
+
+    return new InsertRowNode(
+        new PlanNodeId("plannode 1"),
+        new PartialPath("root.isp.d1"),
+        false,
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.DOUBLE),
+          null,
+          new MeasurementSchema("s3", TSDataType.INT64),
+          null,
+          new MeasurementSchema("s5", TSDataType.BOOLEAN)
+        },
+        dataTypes,
+        time,
+        columns);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertTabletNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertTabletNodeSerdeTest.java
new file mode 100644
index 0000000000..b71fc44f40
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertTabletNodeSerdeTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan.node.write;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class InsertTabletNodeSerdeTest {
+
+  @Test
+  public void TestSerializeAndDeserialize() throws IllegalPathException {
+    InsertTabletNode insertTabletNode = getInsertTabletNode();
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
+    insertTabletNode.serialize(byteBuffer);
+    byteBuffer.flip();
+
+    Assert.assertEquals(PlanNodeType.INSERT_TABLET.ordinal(), byteBuffer.getShort());
+
+    Assert.assertEquals(InsertTabletNode.deserialize(byteBuffer), insertTabletNode);
+  }
+
+  private InsertTabletNode getInsertTabletNode() throws IllegalPathException {
+    long[] times = new long[] {110L, 111L, 112L, 113L};
+    TSDataType[] dataTypes = new TSDataType[5];
+    dataTypes[0] = TSDataType.DOUBLE;
+    dataTypes[1] = TSDataType.FLOAT;
+    dataTypes[2] = TSDataType.INT64;
+    dataTypes[3] = TSDataType.INT32;
+    dataTypes[4] = TSDataType.BOOLEAN;
+
+    Object[] columns = new Object[5];
+    columns[0] = new double[4];
+    columns[1] = new float[4];
+    columns[2] = new long[4];
+    columns[3] = new int[4];
+    columns[4] = new boolean[4];
+
+    for (int r = 0; r < 4; r++) {
+      ((double[]) columns[0])[r] = 1.0;
+      ((float[]) columns[1])[r] = 2;
+      ((long[]) columns[2])[r] = 10000;
+      ((int[]) columns[3])[r] = 100;
+      ((boolean[]) columns[4])[r] = false;
+    }
+
+    InsertTabletNode tabletNode =
+        new InsertTabletNode(
+            new PlanNodeId("plannode 1"),
+            new PartialPath("root.isp.d1"),
+            false,
+            new MeasurementSchema[] {
+              new MeasurementSchema("s1", TSDataType.DOUBLE),
+              new MeasurementSchema("s2", TSDataType.FLOAT),
+              new MeasurementSchema("s3", TSDataType.INT64),
+              new MeasurementSchema("s4", TSDataType.INT32),
+              new MeasurementSchema("s5", TSDataType.BOOLEAN)
+            },
+            dataTypes,
+            times,
+            null,
+            columns,
+            times.length);
+
+    return tabletNode;
+  }
+}