You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/04 14:42:18 UTC

[iotdb] branch master updated: [IOTDB-3034] Partial insert in new cluster (#5763)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 83590e9f08 [IOTDB-3034] Partial insert in new cluster (#5763)
83590e9f08 is described below

commit 83590e9f08b8f517fd185b17bfa5ad484db523f1
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Wed May 4 22:42:11 2022 +0800

    [IOTDB-3034] Partial insert in new cluster (#5763)
---
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  28 +-
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  14 +
 .../iotdb/db/mpp/plan/analyze/SchemaValidator.java |   2 +-
 .../plan/node/write/InsertMultiTabletsNode.java    |  11 +-
 .../plan/planner/plan/node/write/InsertNode.java   | 176 +++++---
 .../planner/plan/node/write/InsertRowNode.java     | 378 ++++++++--------
 .../planner/plan/node/write/InsertRowsNode.java    |  11 +-
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |  11 +-
 .../planner/plan/node/write/InsertTabletNode.java  | 482 +++++++++++----------
 .../service/thrift/impl/InternalServiceImpl.java   |   4 +-
 .../java/org/apache/iotdb/db/utils/MemUtils.java   |   6 +-
 .../apache/iotdb/db/wal/utils/WALWriteUtils.java   |  17 +
 .../write/InsertMultiTabletsNodeSerdeTest.java     |  10 +-
 .../plan/node/write/InsertRowNodeSerdeTest.java    |  17 +-
 .../plan/node/write/InsertRowsNodeSerdeTest.java   |   2 +-
 .../write/InsertRowsOfOneDeviceNodeSerdeTest.java  |   2 +-
 .../plan/node/write/InsertTabletNodeSerdeTest.java |  16 +-
 .../org/apache/iotdb/db/wal/io/WALFileTest.java    | 114 +++++
 .../java/org/apache/iotdb/tsfile/utils/BitMap.java |  23 +
 19 files changed, 752 insertions(+), 572 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index ddd98b3987..cf045e3fb9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -221,10 +221,17 @@ public abstract class AbstractMemTable implements IMemTable {
 
     List<IMeasurementSchema> schemaList = new ArrayList<>();
     List<TSDataType> dataTypes = new ArrayList<>();
+    int nullPointsNumber = 0;
     for (int i = 0; i < insertRowNode.getMeasurements().length; i++) {
+      // use measurements[i] to ignore failed partial insert
       if (measurements[i] == null) {
         continue;
       }
+      // use values[i] to ignore null value
+      if (values[i] == null) {
+        nullPointsNumber++;
+        continue;
+      }
       IMeasurementSchema schema = insertRowNode.getMeasurementSchemas()[i];
       schemaList.add(schema);
       dataTypes.add(schema.getType());
@@ -232,7 +239,10 @@ public abstract class AbstractMemTable implements IMemTable {
     memSize += MemUtils.getRecordsSize(dataTypes, values, disableMemControl);
     write(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values);
 
-    int pointsInserted = insertRowNode.getMeasurements().length;
+    int pointsInserted =
+        insertRowNode.getMeasurements().length
+            - insertRowNode.getFailedMeasurementNumber()
+            - nullPointsNumber;
 
     totalPointsNum += pointsInserted;
 
@@ -295,16 +305,17 @@ public abstract class AbstractMemTable implements IMemTable {
   public void insertAlignedRow(InsertRowNode insertRowNode) {
     // if this insert node isn't from storage engine, we should set a temp device id for it
     if (insertRowNode.getDeviceID() == null) {
-      insertRowNode.setDeviceID(
-          DeviceIDFactory.getInstance().getDeviceID(insertRowNode.getDevicePath()));
+      insertRowNode.setDeviceID(deviceIDFactory.getDeviceID(insertRowNode.getDevicePath()));
     }
 
-    // updatePlanIndexes(insertRowNode.getIndex());
+    // TODO updatePlanIndexes(insertRowNode.getIndex());
     updatePlanIndexes(0);
     String[] measurements = insertRowNode.getMeasurements();
+    Object[] values = insertRowNode.getValues();
     List<IMeasurementSchema> schemaList = new ArrayList<>();
     List<TSDataType> dataTypes = new ArrayList<>();
     for (int i = 0; i < insertRowNode.getMeasurements().length; i++) {
+      // use measurements[i] to ignore failed partial insert
       if (measurements[i] == null) {
         continue;
       }
@@ -315,13 +326,8 @@ public abstract class AbstractMemTable implements IMemTable {
     if (schemaList.isEmpty()) {
       return;
     }
-    memSize +=
-        MemUtils.getAlignedRecordsSize(dataTypes, insertRowNode.getValues(), disableMemControl);
-    writeAlignedRow(
-        insertRowNode.getDeviceID(),
-        schemaList,
-        insertRowNode.getTime(),
-        insertRowNode.getValues());
+    memSize += MemUtils.getAlignedRecordsSize(dataTypes, values, disableMemControl);
+    writeAlignedRow(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values);
     int pointsInserted = insertRowNode.getMeasurements().length;
     totalPointsNum += pointsInserted;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index ce0644859e..7112596005 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -866,6 +866,13 @@ public class DataRegion {
     } finally {
       writeUnlock();
     }
+
+    if (insertRowNode.hasFailedMeasurements()) {
+      logger.warn(
+          "Fail to insert measurements {} caused by {}",
+          insertRowNode.getFailedMeasurements(),
+          insertRowNode.getFailedMessages());
+    }
   }
 
   /**
@@ -1081,6 +1088,13 @@ public class DataRegion {
     } finally {
       writeUnlock();
     }
+
+    if (insertTabletNode.hasFailedMeasurements()) {
+      logger.warn(
+          "Fail to insert measurements {} caused by {}",
+          insertTabletNode.getFailedMeasurements(),
+          insertTabletNode.getFailedMessages());
+    }
   }
 
   /** @return whether the given time falls in ttl */
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
index 748257c328..c0f5b5f412 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
@@ -48,7 +48,7 @@ public class SchemaValidator {
               insertNode.isAligned());
     }
 
-    if (!insertNode.validateSchema(schemaTree)) {
+    if (!insertNode.validateAndSetSchema(schemaTree)) {
       throw new SemanticException("Data type mismatch");
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 1f9e324358..13cf231923 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -117,9 +117,9 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
   }
 
   @Override
-  public boolean validateSchema(SchemaTree schemaTree) {
+  public boolean validateAndSetSchema(SchemaTree schemaTree) {
     for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
-      if (!insertTabletNode.validateSchema(schemaTree)) {
+      if (!insertTabletNode.validateAndSetSchema(schemaTree)) {
         return false;
       }
     }
@@ -179,13 +179,6 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
     return null;
   }
 
-  @Override
-  public void setMeasurementSchemas(SchemaTree schemaTree) {
-    for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
-      insertTabletNode.setMeasurementSchemas(schemaTree);
-    }
-  }
-
   @Override
   public List<PartialPath> getDevicePaths() {
     List<PartialPath> partialPaths = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index 45b54aa979..48252bbab7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -20,27 +20,27 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
-import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.wal.utils.WALWriteUtils;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 public abstract class InsertNode extends WritePlanNode {
 
@@ -57,6 +57,9 @@ public abstract class InsertNode extends WritePlanNode {
   // TODO(INSERT) need to change it to a function handle to update last time value
   //  protected IMeasurementMNode[] measurementMNodes;
 
+  /** index of failed measurements -> info including measurement, data type and value */
+  protected Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info;
+
   /**
    * device id reference, for reuse device id in both id table and memtable <br>
    * used in memtable
@@ -135,55 +138,37 @@ public abstract class InsertNode extends WritePlanNode {
     this.deviceID = deviceID;
   }
 
-  protected void serializeMeasurementSchemaToWAL(IWALByteBufferView buffer) {
-    for (MeasurementSchema measurementSchema : measurementSchemas) {
-      WALWriteUtils.write(measurementSchema, buffer);
-    }
-  }
-
-  protected int serializeMeasurementSchemaSize() {
+  /** Serialized size of measurement schemas, ignoring failed time series */
+  protected int serializeMeasurementSchemasSize() {
     int byteLen = 0;
-    for (MeasurementSchema measurementSchema : measurementSchemas) {
-      byteLen += ReadWriteIOUtils.sizeToWrite(measurementSchema.getMeasurementId());
-      byteLen += 3 * Byte.BYTES;
-      Map<String, String> props = measurementSchema.getProps();
-      if (props == null) {
-        byteLen += Integer.BYTES;
-      } else {
-        byteLen += Integer.BYTES;
-        for (Map.Entry<String, String> entry : props.entrySet()) {
-          byteLen += ReadWriteIOUtils.sizeToWrite(entry.getKey());
-          byteLen += ReadWriteIOUtils.sizeToWrite(entry.getValue());
-        }
+    for (int i = 0; i < measurements.length; i++) {
+      // ignore failed partial insert
+      if (measurements[i] == null) {
+        continue;
       }
+      byteLen += WALWriteUtils.sizeToWrite(measurementSchemas[i]);
     }
     return byteLen;
   }
 
-  /** Make sure the measurement schema is already inited before calling this */
-  protected void deserializeMeasurementSchema(DataInputStream stream) throws IOException {
-    for (int i = 0; i < measurementSchemas.length; i++) {
-
-      measurementSchemas[i] =
-          new MeasurementSchema(
-              ReadWriteIOUtils.readString(stream),
-              TSDataType.deserialize(ReadWriteIOUtils.readByte(stream)),
-              TSEncoding.deserialize(ReadWriteIOUtils.readByte(stream)),
-              CompressionType.deserialize(ReadWriteIOUtils.readByte(stream)));
-
-      int size = ReadWriteIOUtils.readInt(stream);
-      if (size > 0) {
-        Map<String, String> props = new HashMap<>();
-        String key;
-        String value;
-        for (int j = 0; j < size; j++) {
-          key = ReadWriteIOUtils.readString(stream);
-          value = ReadWriteIOUtils.readString(stream);
-          props.put(key, value);
-        }
-        measurementSchemas[i].setProps(props);
+  /** Serialize measurement schemas, ignoring failed time series */
+  protected void serializeMeasurementSchemasToWAL(IWALByteBufferView buffer) {
+    for (int i = 0; i < measurements.length; i++) {
+      // ignore failed partial insert
+      if (measurements[i] == null) {
+        continue;
       }
+      WALWriteUtils.write(measurementSchemas[i], buffer);
+    }
+  }
 
+  /**
+   * Deserialize measurement schemas. Make sure the measurement schemas and measurements have been
+   * created before calling this
+   */
+  protected void deserializeMeasurementSchemas(DataInputStream stream) throws IOException {
+    for (int i = 0; i < measurementSchemas.length; i++) {
+      measurementSchemas[i] = MeasurementSchema.deserializeFrom(stream);
       measurements[i] = measurementSchemas[i].getMeasurementId();
     }
   }
@@ -192,33 +177,96 @@ public abstract class InsertNode extends WritePlanNode {
     return dataRegionReplicaSet;
   }
 
-  public abstract boolean validateSchema(SchemaTree schemaTree);
+  public abstract boolean validateAndSetSchema(SchemaTree schemaTree);
 
-  public void setMeasurementSchemas(SchemaTree schemaTree) {
-    DeviceSchemaInfo deviceSchemaInfo =
-        schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
-    measurementSchemas =
-        deviceSchemaInfo.getMeasurementSchemaList().toArray(new MeasurementSchema[0]);
+  /** Check whether data types are matched with measurement schemas */
+  protected boolean selfCheckDataTypes() {
+    for (int i = 0; i < measurementSchemas.length; i++) {
+      if (dataTypes[i] != measurementSchemas[i].getType()) {
+        if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+          return false;
+        } else {
+          markFailedMeasurement(
+              i,
+              new DataTypeMismatchException(
+                  devicePath.getFullPath(),
+                  measurements[i],
+                  measurementSchemas[i].getType(),
+                  dataTypes[i]));
+        }
+      }
+    }
+    return true;
   }
 
+  // region partial insert
   /**
-   * This method is overrided in InsertRowPlan and InsertTabletPlan. After marking failed
-   * measurements, the failed values or columns would be null as well. We'd better use
-   * "measurements[index] == null" to determine if the measurement failed.
+   * Mark failed measurement, measurements[index], dataTypes[index] and values/columns[index] would
+   * be null. We'd better use "measurements[index] == null" to determine if the measurement failed.
+   * <br>
+   * This method is not concurrency-safe.
    *
    * @param index failed measurement index
+   * @param cause cause Exception of failure
    */
-  public void markFailedMeasurementInsertion(int index, Exception e) {
-    // todo partial insert
-    if (measurements[index] == null) {
-      return;
+  public void markFailedMeasurement(int index, Exception cause) {
+    throw new UnsupportedOperationException();
+  }
+
+  public boolean hasFailedMeasurements() {
+    return failedMeasurementIndex2Info != null && !failedMeasurementIndex2Info.isEmpty();
+  }
+
+  public int getFailedMeasurementNumber() {
+    return failedMeasurementIndex2Info == null ? 0 : failedMeasurementIndex2Info.size();
+  }
+
+  public List<String> getFailedMeasurements() {
+    return failedMeasurementIndex2Info == null
+        ? Collections.emptyList()
+        : failedMeasurementIndex2Info.values().stream()
+            .map(info -> info.measurement)
+            .collect(Collectors.toList());
+  }
+
+  public List<Exception> getFailedExceptions() {
+    return failedMeasurementIndex2Info == null
+        ? Collections.emptyList()
+        : failedMeasurementIndex2Info.values().stream()
+            .map(info -> info.cause)
+            .collect(Collectors.toList());
+  }
+
+  public List<String> getFailedMessages() {
+    return failedMeasurementIndex2Info == null
+        ? Collections.emptyList()
+        : failedMeasurementIndex2Info.values().stream()
+            .map(
+                info -> {
+                  Throwable cause = info.cause;
+                  while (cause.getCause() != null) {
+                    cause = cause.getCause();
+                  }
+                  return cause.getMessage();
+                })
+            .collect(Collectors.toList());
+  }
+
+  protected static class FailedMeasurementInfo {
+    protected String measurement;
+    protected TSDataType dataType;
+    protected Object value;
+    protected Exception cause;
+
+    public FailedMeasurementInfo(
+        String measurement, TSDataType dataType, Object value, Exception cause) {
+      this.measurement = measurement;
+      this.dataType = dataType;
+      this.value = value;
+      this.cause = cause;
     }
-    //    if (failedMeasurements == null) {
-    //      failedMeasurements = new ArrayList<>();
-    //    }
-    //    failedMeasurements.add(measurements[index]);
-    measurements[index] = null;
   }
+  // endregion
 
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index f63ee885e4..61a8081909 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngineV2;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
@@ -54,6 +53,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Objects;
 
@@ -165,159 +165,90 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   }
 
   @Override
-  public boolean validateSchema(SchemaTree schemaTree) {
+  public boolean validateAndSetSchema(SchemaTree schemaTree) {
     DeviceSchemaInfo deviceSchemaInfo =
         schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
-
-    List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
-
-    if (isNeedInferType) {
-      try {
-        transferType(measurementSchemas);
-      } catch (QueryProcessException e) {
-        return false;
-      }
-    } else {
-      // todo partial insert
-      if (deviceSchemaInfo.isAligned() != isAligned) {
-        return false;
-      }
-
-      for (int i = 0; i < measurementSchemas.size(); i++) {
-        if (dataTypes[i] != measurementSchemas.get(i).getType()) {
-          if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
-            return false;
-          } else {
-            markFailedMeasurementInsertion(
-                i,
-                new DataTypeMismatchException(
-                    devicePath.getFullPath(),
-                    measurements[i],
-                    measurementSchemas.get(i).getType(),
-                    dataTypes[i]));
-          }
-        }
-      }
+    if (deviceSchemaInfo.isAligned() != isAligned) {
+      return false;
     }
+    this.measurementSchemas =
+        deviceSchemaInfo.getMeasurementSchemaList().toArray(new MeasurementSchema[0]);
 
-    // filter failed measurements
-    measurements = Arrays.stream(measurements).filter(Objects::nonNull).toArray(String[]::new);
-    dataTypes = Arrays.stream(dataTypes).filter(Objects::nonNull).toArray(TSDataType[]::new);
-    values = Arrays.stream(values).filter(Objects::nonNull).toArray(Object[]::new);
-
-    return true;
-  }
-
-  @Override
-  public void markFailedMeasurementInsertion(int index, Exception e) {
-    if (measurements[index] == null) {
-      return;
+    // transfer data types from string values when necessary
+    try {
+      transferType();
+    } catch (QueryProcessException e) {
+      return false;
     }
-    super.markFailedMeasurementInsertion(index, e);
-    values[index] = null;
-    dataTypes[index] = null;
+
+    // validate whether data types are matched
+    return selfCheckDataTypes();
   }
 
   /**
-   * if inferType is true, transfer String[] values to specific data types (Integer, Long, Float,
-   * Double, Binary)
+   * transfer String[] values to specific data types when isNeedInferType is true. <br>
+   * Notice: measurementSchemas must be initialized before calling this method
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  public void transferType(List<MeasurementSchema> measurementSchemas)
-      throws QueryProcessException {
-    if (isNeedInferType) {
-      for (int i = 0; i < measurementSchemas.size(); i++) {
-        if (measurementSchemas.get(i) == null) {
-          if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
-            markFailedMeasurementInsertion(
-                i,
-                new QueryProcessException(
-                    new PathNotExistException(
-                        devicePath.getFullPath()
-                            + IoTDBConstant.PATH_SEPARATOR
-                            + measurements[i])));
-          } else {
-            throw new QueryProcessException(
-                new PathNotExistException(
-                    devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
-          }
-          continue;
-        }
+  private void transferType() throws QueryProcessException {
+    if (!isNeedInferType) {
+      return;
+    }
 
-        dataTypes[i] = measurementSchemas.get(i).getType();
-        try {
-          values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
-        } catch (Exception e) {
-          logger.warn(
-              "{}.{} data type is not consistent, input {}, registered {}",
-              devicePath,
-              measurements[i],
-              values[i],
-              dataTypes[i]);
-          if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
-            markFailedMeasurementInsertion(i, e);
-          } else {
-            throw e;
-          }
+    for (int i = 0; i < measurementSchemas.length; i++) {
+      // null when time series doesn't exist
+      if (measurementSchemas[i] == null) {
+        if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+          throw new QueryProcessException(
+              new PathNotExistException(
+                  devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
+        } else {
+          markFailedMeasurement(
+              i,
+              new QueryProcessException(
+                  new PathNotExistException(
+                      devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i])));
+        }
+        continue;
+      }
+      // parse string value to specific type
+      dataTypes[i] = measurementSchemas[i].getType();
+      try {
+        values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
+      } catch (Exception e) {
+        logger.warn(
+            "{}.{} data type is not consistent, input {}, registered {}",
+            devicePath,
+            measurements[i],
+            values[i],
+            dataTypes[i]);
+        if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+          throw e;
+        } else {
+          markFailedMeasurement(i, e);
         }
       }
-      isNeedInferType = false;
     }
+    isNeedInferType = false;
   }
 
   @Override
-  public int serializedSize() {
-    int size = 0;
-    size += Short.BYTES;
-    return size + subSerializeSize();
-  }
-
-  int subSerializeSize() {
-    int size = 0;
-    size += Long.BYTES;
-    size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
-    return size + serializeMeasurementsAndValuesSize();
-  }
-
-  int serializeMeasurementsAndValuesSize() {
-    int size = 0;
-    size += Integer.BYTES;
-
-    size += serializeMeasurementSchemaSize();
+  public void markFailedMeasurement(int index, Exception cause) {
+    if (measurements[index] == null) {
+      return;
+    }
 
-    // putValues
-    for (int i = 0; i < values.length; i++) {
-      if (dataTypes[i] != null) {
-        if (values[i] == null) {
-          size += Byte.BYTES;
-          continue;
-        }
-        size += Byte.BYTES;
-        switch (dataTypes[i]) {
-          case BOOLEAN:
-            size += Byte.BYTES;
-            break;
-          case INT32:
-            size += Integer.BYTES;
-            break;
-          case INT64:
-            size += Long.BYTES;
-            break;
-          case FLOAT:
-            size += Float.BYTES;
-            break;
-          case DOUBLE:
-            size += Double.BYTES;
-            break;
-          case TEXT:
-            size += ReadWriteIOUtils.sizeToWrite((Binary) values[i]);
-            break;
-        }
-      }
+    if (failedMeasurementIndex2Info == null) {
+      failedMeasurementIndex2Info = new HashMap<>();
     }
 
-    size += Byte.BYTES;
-    return size;
+    FailedMeasurementInfo failedMeasurementInfo =
+        new FailedMeasurementInfo(measurements[index], dataTypes[index], values[index], cause);
+    failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo);
+
+    measurements[index] = null;
+    dataTypes[index] = null;
+    values[index] = null;
   }
 
   @Override
@@ -332,40 +263,47 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
     serializeMeasurementsAndValues(buffer);
   }
 
+  /** Serialize measurements and values, ignoring failed time series */
   void serializeMeasurementsAndValues(ByteBuffer buffer) {
-    buffer.putInt(measurements.length);
+    buffer.putInt(measurements.length - getFailedMeasurementNumber());
+    serializeMeasurementsOrSchemas(buffer);
+    putDataTypesAndValues(buffer);
+    buffer.put((byte) (isNeedInferType ? 1 : 0));
+    buffer.put((byte) (isAligned ? 1 : 0));
+  }
 
-    // check whether has measurement schemas or not
+  /** Serialize measurements or measurement schemas, ignoring failed time series */
+  private void serializeMeasurementsOrSchemas(ByteBuffer buffer) {
     buffer.put((byte) (measurementSchemas != null ? 1 : 0));
-    if (measurementSchemas != null) {
-      for (MeasurementSchema measurementSchema : measurementSchemas) {
-        measurementSchema.serializeTo(buffer);
+    for (int i = 0; i < measurements.length; i++) {
+      // ignore failed partial insert
+      if (measurements[i] == null) {
+        continue;
       }
-    } else {
-      for (String measurement : measurements) {
-        ReadWriteIOUtils.write(measurement, buffer);
+      // serialize measurement schemas when exist
+      if (measurementSchemas != null) {
+        measurementSchemas[i].serializeTo(buffer);
+      } else {
+        ReadWriteIOUtils.write(measurements[i], buffer);
       }
     }
-
-    try {
-      putValues(buffer);
-    } catch (QueryProcessException e) {
-      logger.error("Failed to serialize values for {}", this, e);
-    }
-
-    buffer.put((byte) (isNeedInferType ? 1 : 0));
-    buffer.put((byte) (isAligned ? 1 : 0));
   }
 
-  private void putValues(ByteBuffer buffer) throws QueryProcessException {
+  /** Serialize data types and values, ignoring failed time series */
+  private void putDataTypesAndValues(ByteBuffer buffer) {
     for (int i = 0; i < values.length; i++) {
+      // ignore failed partial insert
+      if (measurements[i] == null) {
+        continue;
+      }
+      // serialize null value
       if (values[i] == null) {
         ReadWriteIOUtils.write(TYPE_NULL, buffer);
         continue;
       }
       // types are not determined, the situation mainly occurs when the plan uses string values
       // and is forwarded to other nodes
-      if (dataTypes == null || dataTypes[i] == null) {
+      if (isNeedInferType) {
         ReadWriteIOUtils.write(TYPE_RAW_STRING, buffer);
         ReadWriteIOUtils.write(values[i].toString(), buffer);
       } else {
@@ -390,7 +328,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
             ReadWriteIOUtils.write((Binary) values[i], buffer);
             break;
           default:
-            throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+            throw new RuntimeException("Unsupported data type:" + dataTypes[i]);
         }
       }
     }
@@ -404,7 +342,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
     return insertNode;
   }
 
-  public void subDeserialize(ByteBuffer byteBuffer) {
+  void subDeserialize(ByteBuffer byteBuffer) {
     time = byteBuffer.getLong();
     try {
       devicePath = new PartialPath(ReadWriteIOUtils.readString(byteBuffer));
@@ -420,7 +358,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
     this.measurements = new String[measurementSize];
     boolean hasSchema = buffer.get() == 1;
     if (hasSchema) {
-      this.measurementSchemas = new MeasurementSchema[measurementSize];
+      measurementSchemas = new MeasurementSchema[measurementSize];
       for (int i = 0; i < measurementSize; i++) {
         measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
         measurements[i] = measurementSchemas[i].getMeasurementId();
@@ -431,20 +369,16 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
       }
     }
 
-    this.dataTypes = new TSDataType[measurementSize];
-    this.values = new Object[measurementSize];
-    try {
-      fillValues(buffer);
-    } catch (QueryProcessException e) {
-      e.printStackTrace();
-    }
+    dataTypes = new TSDataType[measurementSize];
+    values = new Object[measurementSize];
+    fillDataTypesAndValues(buffer);
 
     isNeedInferType = buffer.get() == 1;
     isAligned = buffer.get() == 1;
   }
 
-  /** Make sure the values is already inited before calling this */
-  public void fillValues(ByteBuffer buffer) throws QueryProcessException {
+  /** Make sure the dataTypes and values have been created before calling this */
+  private void fillDataTypesAndValues(ByteBuffer buffer) {
     for (int i = 0; i < dataTypes.length; i++) {
       // types are not determined, the situation mainly occurs when the node uses string values
       // and is forwarded to other nodes
@@ -474,40 +408,97 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
           values[i] = ReadWriteIOUtils.readBinary(buffer);
           break;
         default:
-          throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+          throw new RuntimeException("Unsupported data type:" + dataTypes[i]);
       }
     }
   }
 
+  // region serialize & deserialize methods for WAL
+  /** Serialized size for wal */
+  @Override
+  public int serializedSize() {
+    return Short.BYTES + subSerializeSize();
+  }
+
+  private int subSerializeSize() {
+    int size = 0;
+    size += Long.BYTES;
+    size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
+    return size + serializeMeasurementsAndValuesSize();
+  }
+
+  private int serializeMeasurementsAndValuesSize() {
+    int size = 0;
+    size += Integer.BYTES;
+
+    size += serializeMeasurementSchemasSize();
+
+    // putValues
+    for (int i = 0; i < values.length; i++) {
+      // ignore failed partial insert
+      if (measurements[i] == null) {
+        continue;
+      }
+      // serialize null value
+      if (values[i] == null) {
+        size += Byte.BYTES;
+        continue;
+      }
+      size += Byte.BYTES;
+      switch (dataTypes[i]) {
+        case BOOLEAN:
+          size += Byte.BYTES;
+          break;
+        case INT32:
+          size += Integer.BYTES;
+          break;
+        case INT64:
+          size += Long.BYTES;
+          break;
+        case FLOAT:
+          size += Float.BYTES;
+          break;
+        case DOUBLE:
+          size += Double.BYTES;
+          break;
+        case TEXT:
+          size += ReadWriteIOUtils.sizeToWrite((Binary) values[i]);
+          break;
+      }
+    }
+
+    size += Byte.BYTES;
+    return size;
+  }
+
   @Override
   public void serializeToWAL(IWALByteBufferView buffer) {
     buffer.putShort((short) PlanNodeType.INSERT_ROW.ordinal());
     subSerialize(buffer);
   }
 
-  void subSerialize(IWALByteBufferView buffer) {
+  private void subSerialize(IWALByteBufferView buffer) {
     buffer.putLong(time);
     WALWriteUtils.write(devicePath.getFullPath(), buffer);
     serializeMeasurementsAndValues(buffer);
   }
 
-  void serializeMeasurementsAndValues(IWALByteBufferView buffer) {
-    buffer.putInt(measurementSchemas.length);
-
-    serializeMeasurementSchemaToWAL(buffer);
-
-    try {
-      putValues(buffer);
-    } catch (QueryProcessException e) {
-      logger.error("Failed to serialize values for {}", this, e);
-    }
-
+  /** Serialize measurements and values, ignoring failed time series */
+  private void serializeMeasurementsAndValues(IWALByteBufferView buffer) {
+    buffer.putInt(measurementSchemas.length - getFailedMeasurementNumber());
+    serializeMeasurementSchemasToWAL(buffer);
+    putDataTypesAndValues(buffer);
     buffer.put((byte) (isAligned ? 1 : 0));
   }
 
-  private void putValues(IWALByteBufferView buffer) throws QueryProcessException {
-    // todo remove serialize datatype after serializing measurement schema
+  /** Serialize data types and values, ignoring failed time series */
+  private void putDataTypesAndValues(IWALByteBufferView buffer) {
     for (int i = 0; i < values.length; i++) {
+      // ignore failed partial insert
+      if (measurements[i] == null) {
+        continue;
+      }
+      // serialize null value
       if (values[i] == null) {
         WALWriteUtils.write(TYPE_NULL, buffer);
         continue;
@@ -533,14 +524,14 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
           WALWriteUtils.write((Binary) values[i], buffer);
           break;
         default:
-          throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+          throw new RuntimeException("Unsupported data type:" + dataTypes[i]);
       }
     }
   }
 
+  /** Deserialize from wal */
   public static InsertRowNode deserialize(DataInputStream stream)
       throws IOException, IllegalPathException {
-    // This method is used for deserialize from wal
     // we do not store plan node id in wal entry
     InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
     insertNode.setTime(stream.readLong());
@@ -553,23 +544,19 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   void deserializeMeasurementsAndValues(DataInputStream stream) throws IOException {
     int measurementSize = stream.readInt();
 
-    this.measurements = new String[measurementSize];
-    this.measurementSchemas = new MeasurementSchema[measurementSize];
-    deserializeMeasurementSchema(stream);
+    measurements = new String[measurementSize];
+    measurementSchemas = new MeasurementSchema[measurementSize];
+    deserializeMeasurementSchemas(stream);
 
-    this.dataTypes = new TSDataType[measurementSize];
-    this.values = new Object[measurementSize];
-    try {
-      fillValues(stream);
-    } catch (QueryProcessException e) {
-      e.printStackTrace();
-    }
+    dataTypes = new TSDataType[measurementSize];
+    values = new Object[measurementSize];
+    fillDataTypesAndValues(stream);
 
     isAligned = stream.readByte() == 1;
   }
 
-  /** Make sure the values is already inited before calling this */
-  public void fillValues(DataInputStream stream) throws QueryProcessException, IOException {
+  /** Make sure the dataTypes and values have been created before calling this */
+  public void fillDataTypesAndValues(DataInputStream stream) throws IOException {
     for (int i = 0; i < dataTypes.length; i++) {
       byte typeNum = stream.readByte();
       if (typeNum == TYPE_NULL) {
@@ -596,10 +583,11 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
           values[i] = ReadWriteIOUtils.readBinary(stream);
           break;
         default:
-          throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+          throw new RuntimeException("Unsupported data type:" + dataTypes[i]);
       }
     }
   }
+  // endregion
 
   @Override
   public boolean equals(Object o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
index f9dd6c9359..5ca6141a8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
@@ -103,22 +103,15 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode {
   public void addChild(PlanNode child) {}
 
   @Override
-  public boolean validateSchema(SchemaTree schemaTree) {
+  public boolean validateAndSetSchema(SchemaTree schemaTree) {
     for (InsertRowNode insertRowNode : insertRowNodeList) {
-      if (!insertRowNode.validateSchema(schemaTree)) {
+      if (!insertRowNode.validateAndSetSchema(schemaTree)) {
         return false;
       }
     }
     return true;
   }
 
-  @Override
-  public void setMeasurementSchemas(SchemaTree schemaTree) {
-    for (InsertRowNode insertRowNode : insertRowNodeList) {
-      insertRowNode.setMeasurementSchemas(schemaTree);
-    }
-  }
-
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 24432e6064..eaa8d2f2a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -138,22 +138,15 @@ public class InsertRowsOfOneDeviceNode extends InsertNode implements BatchInsert
   }
 
   @Override
-  public boolean validateSchema(SchemaTree schemaTree) {
+  public boolean validateAndSetSchema(SchemaTree schemaTree) {
     for (InsertRowNode insertRowNode : insertRowNodeList) {
-      if (!insertRowNode.validateSchema(schemaTree)) {
+      if (!insertRowNode.validateAndSetSchema(schemaTree)) {
         return false;
       }
     }
     return true;
   }
 
-  @Override
-  public void setMeasurementSchemas(SchemaTree schemaTree) {
-    for (InsertRowNode insertRowNode : insertRowNodeList) {
-      insertRowNode.setMeasurementSchemas(schemaTree);
-    }
-  }
-
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
     List<WritePlanNode> result = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index b7133a5ec7..24b560cc9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -23,9 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngineV2;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
@@ -164,38 +162,17 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
   }
 
   @Override
-  public boolean validateSchema(SchemaTree schemaTree) {
+  public boolean validateAndSetSchema(SchemaTree schemaTree) {
     DeviceSchemaInfo deviceSchemaInfo =
         schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
-
-    // todo partial insert
     if (deviceSchemaInfo.isAligned() != isAligned) {
       return false;
     }
+    measurementSchemas =
+        deviceSchemaInfo.getMeasurementSchemaList().toArray(new MeasurementSchema[0]);
 
-    List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
-    for (int i = 0; i < measurementSchemas.size(); i++) {
-      if (dataTypes[i] != measurementSchemas.get(i).getType()) {
-        if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
-          return false;
-        } else {
-          markFailedMeasurementInsertion(
-              i,
-              new DataTypeMismatchException(
-                  devicePath.getFullPath(),
-                  measurements[i],
-                  measurementSchemas.get(i).getType(),
-                  dataTypes[i]));
-        }
-      }
-    }
-
-    // filter failed measurements
-    measurements = Arrays.stream(measurements).filter(Objects::nonNull).toArray(String[]::new);
-    dataTypes = Arrays.stream(dataTypes).filter(Objects::nonNull).toArray(TSDataType[]::new);
-    columns = Arrays.stream(columns).filter(Objects::nonNull).toArray(Object[]::new);
-
-    return true;
+    // validate whether data types are matched
+    return selfCheckDataTypes();
   }
 
   @Override
@@ -316,91 +293,58 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     return result;
   }
 
-  @Override
-  public void markFailedMeasurementInsertion(int index, Exception e) {
-    if (measurements[index] == null) {
-      return;
+  private Object[] initTabletValues(int columnSize, int rowSize, TSDataType[] dataTypes) {
+    Object[] values = new Object[columnSize];
+    for (int i = 0; i < values.length; i++) {
+      switch (dataTypes[i]) {
+        case TEXT:
+          values[i] = new Binary[rowSize];
+          break;
+        case FLOAT:
+          values[i] = new float[rowSize];
+          break;
+        case INT32:
+          values[i] = new int[rowSize];
+          break;
+        case INT64:
+          values[i] = new long[rowSize];
+          break;
+        case DOUBLE:
+          values[i] = new double[rowSize];
+          break;
+        case BOOLEAN:
+          values[i] = new boolean[rowSize];
+          break;
+      }
     }
-    super.markFailedMeasurementInsertion(index, e);
-    dataTypes[index] = null;
-    columns[index] = null;
-    bitMaps[index] = null;
-  }
-
-  @Override
-  public int serializedSize() {
-    return serializedSize(0, rowCount);
+    return values;
   }
 
-  public int serializedSize(int start, int end) {
-    int size = 0;
-    size += Short.BYTES;
-    return size + subSerializeSize(start, end);
+  private BitMap[] initBitmaps(int columnSize, int rowSize) {
+    BitMap[] bitMaps = new BitMap[columnSize];
+    for (int i = 0; i < columnSize; i++) {
+      bitMaps[i] = new BitMap(rowSize);
+    }
+    return bitMaps;
   }
 
-  int subSerializeSize(int start, int end) {
-    int size = 0;
-    size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
-    // measurements size
-    size += Integer.BYTES;
-
-    size += serializeMeasurementSchemaSize();
-
-    size += Byte.BYTES * dataTypes.length;
-
-    // times size
-    size += Integer.BYTES;
-    size += Long.BYTES * (end - start);
-    // bitmaps size
-    size += Byte.BYTES;
-    if (bitMaps != null) {
-      for (BitMap bitMap : bitMaps) {
-        size += Byte.BYTES;
-        if (bitMap != null) {
-          int len = end - start;
-          BitMap partBitMap = new BitMap(len);
-          BitMap.copyOfRange(bitMap, start, partBitMap, 0, len);
-          size += partBitMap.getByteArray().length;
-        }
-      }
+  @Override
+  public void markFailedMeasurement(int index, Exception cause) {
+    if (measurements[index] == null) {
+      return;
     }
-    // values size
-    for (int i = 0; i < dataTypes.length; i++) {
-      if (columns[i] != null) {
-        size += getColumnSize(dataTypes[i], columns[i], start, end);
-      }
+
+    if (failedMeasurementIndex2Info == null) {
+      failedMeasurementIndex2Info = new HashMap<>();
     }
 
-    size += Byte.BYTES;
-    return size;
-  }
+    FailedMeasurementInfo failedMeasurementInfo =
+        new FailedMeasurementInfo(measurements[index], dataTypes[index], columns[index], cause);
+    failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo);
 
-  private int getColumnSize(TSDataType dataType, Object column, int start, int end) {
-    int size = 0;
-    switch (dataType) {
-      case INT32:
-        size += Integer.BYTES * (end - start);
-        break;
-      case INT64:
-        size += Long.BYTES * (end - start);
-        break;
-      case FLOAT:
-        size += Float.BYTES * (end - start);
-        break;
-      case DOUBLE:
-        size += Double.BYTES * (end - start);
-        break;
-      case BOOLEAN:
-        size += Byte.BYTES * (end - start);
-        break;
-      case TEXT:
-        Binary[] binaryValues = (Binary[]) column;
-        for (int j = start; j < end; j++) {
-          size += ReadWriteIOUtils.sizeToWrite(binaryValues[j]);
-        }
-        break;
-    }
-    return size;
+    measurements[index] = null;
+    dataTypes[index] = null;
+    columns[index] = null;
   }
 
   @Override
@@ -411,7 +355,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
 
   void subSerialize(ByteBuffer buffer) {
     ReadWriteIOUtils.write(devicePath.getFullPath(), buffer);
-    writeMeasurements(buffer);
+    writeMeasurementsOrSchemas(buffer);
     writeDataTypes(buffer);
     writeTimes(buffer);
     writeBitMaps(buffer);
@@ -419,26 +363,33 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     buffer.put((byte) (isAligned ? 1 : 0));
   }
 
-  private void writeMeasurements(ByteBuffer buffer) {
-    buffer.putInt(measurements.length);
+  /** Serialize measurements or measurement schemas, ignoring failed time series */
+  private void writeMeasurementsOrSchemas(ByteBuffer buffer) {
+    buffer.putInt(measurements.length - getFailedMeasurementNumber());
 
-    // check whether has measurement schemas or not
     buffer.put((byte) (measurementSchemas != null ? 1 : 0));
-
-    if (measurementSchemas != null) {
-      for (MeasurementSchema measurement : measurementSchemas) {
-        measurement.serializeTo(buffer);
+    for (int i = 0; i < measurements.length; i++) {
+      // ignore failed partial insert
+      if (measurements[i] == null) {
+        continue;
       }
-    } else {
-      for (String measurement : measurements) {
-        ReadWriteIOUtils.write(measurement, buffer);
+      // serialize measurement schemas when exist
+      if (measurementSchemas != null) {
+        measurementSchemas[i].serializeTo(buffer);
+      } else {
+        ReadWriteIOUtils.write(measurements[i], buffer);
       }
     }
   }
 
+  /** Serialize data types, ignoring failed time series */
   private void writeDataTypes(ByteBuffer buffer) {
-    for (TSDataType dataType : dataTypes) {
-      dataType.serializeTo(buffer);
+    for (int i = 0; i < dataTypes.length; i++) {
+      // ignore failed partial insert
+      if (measurements[i] == null) {
+        continue;
+      }
+      dataTypes[i].serializeTo(buffer);
     }
   }
 
@@ -449,23 +400,33 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     }
   }
 
+  /** Serialize bitmaps, ignoring failed time series */
   private void writeBitMaps(ByteBuffer buffer) {
     buffer.put(BytesUtils.boolToByte(bitMaps != null));
     if (bitMaps != null) {
-      for (int i = 0; i < measurements.length; i++) {
-        BitMap bitMap = bitMaps[i];
-        if (bitMap == null) {
+      for (int i = 0; i < bitMaps.length; i++) {
+        // ignore failed partial insert
+        if (measurements[i] == null) {
+          continue;
+        }
+
+        if (bitMaps[i] == null) {
           buffer.put(BytesUtils.boolToByte(false));
         } else {
           buffer.put(BytesUtils.boolToByte(true));
-          buffer.put(bitMap.getByteArray());
+          buffer.put(bitMaps[i].getByteArray());
         }
       }
     }
   }
 
+  /** Serialize values, ignoring failed time series */
   private void writeValues(ByteBuffer buffer) {
-    for (int i = 0; i < dataTypes.length; i++) {
+    for (int i = 0; i < columns.length; i++) {
+      // ignore failed partial insert
+      if (measurements[i] == null) {
+        continue;
+      }
       serializeColumn(dataTypes[i], columns[i], buffer);
     }
   }
@@ -514,6 +475,132 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     }
   }
 
+  public static InsertTabletNode deserialize(ByteBuffer byteBuffer) {
+    InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
+    insertNode.subDeserialize(byteBuffer);
+    insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
+    return insertNode;
+  }
+
+  public void subDeserialize(ByteBuffer buffer) {
+    try {
+      devicePath = new PartialPath(ReadWriteIOUtils.readString(buffer));
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Cannot deserialize InsertTabletNode", e);
+    }
+
+    int measurementSize = buffer.getInt();
+    measurements = new String[measurementSize];
+
+    boolean hasSchema = buffer.get() == 1;
+    if (hasSchema) {
+      this.measurementSchemas = new MeasurementSchema[measurementSize];
+      for (int i = 0; i < measurementSize; i++) {
+        measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
+        measurements[i] = measurementSchemas[i].getMeasurementId();
+      }
+    } else {
+      for (int i = 0; i < measurementSize; i++) {
+        measurements[i] = ReadWriteIOUtils.readString(buffer);
+      }
+    }
+
+    dataTypes = new TSDataType[measurementSize];
+    for (int i = 0; i < measurementSize; i++) {
+      dataTypes[i] = TSDataType.deserialize(buffer.get());
+    }
+
+    rowCount = buffer.getInt();
+    times = new long[rowCount];
+    times = QueryDataSetUtils.readTimesFromBuffer(buffer, rowCount);
+
+    boolean hasBitMaps = BytesUtils.byteToBool(buffer.get());
+    if (hasBitMaps) {
+      bitMaps = QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize, rowCount);
+    }
+    columns =
+        QueryDataSetUtils.readTabletValuesFromBuffer(buffer, dataTypes, measurementSize, rowCount);
+    isAligned = buffer.get() == 1;
+  }
+
+  // region serialize & deserialize methods for WAL
+  /** Serialized size for wal */
+  @Override
+  public int serializedSize() {
+    return serializedSize(0, rowCount);
+  }
+
+  /** Serialized size for wal */
+  public int serializedSize(int start, int end) {
+    return Short.BYTES + subSerializeSize(start, end);
+  }
+
+  int subSerializeSize(int start, int end) {
+    int size = 0;
+    size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
+    // measurements size
+    size += Integer.BYTES;
+    size += serializeMeasurementSchemasSize();
+    // times size
+    size += Integer.BYTES;
+    size += Long.BYTES * (end - start);
+    // bitmaps size
+    size += Byte.BYTES;
+    if (bitMaps != null) {
+      for (int i = 0; i < bitMaps.length; i++) {
+        // ignore failed partial insert
+        if (measurements[i] == null) {
+          continue;
+        }
+
+        size += Byte.BYTES;
+        if (bitMaps[i] != null) {
+          int len = end - start;
+          BitMap partBitMap = new BitMap(len);
+          BitMap.copyOfRange(bitMaps[i], start, partBitMap, 0, len);
+          size += partBitMap.getByteArray().length;
+        }
+      }
+    }
+    // values size
+    for (int i = 0; i < dataTypes.length; i++) {
+      if (columns[i] != null) {
+        size += getColumnSize(dataTypes[i], columns[i], start, end);
+      }
+    }
+
+    size += Byte.BYTES;
+    return size;
+  }
+
+  private int getColumnSize(TSDataType dataType, Object column, int start, int end) {
+    int size = 0;
+    switch (dataType) {
+      case INT32:
+        size += Integer.BYTES * (end - start);
+        break;
+      case INT64:
+        size += Long.BYTES * (end - start);
+        break;
+      case FLOAT:
+        size += Float.BYTES * (end - start);
+        break;
+      case DOUBLE:
+        size += Double.BYTES * (end - start);
+        break;
+      case BOOLEAN:
+        size += Byte.BYTES * (end - start);
+        break;
+      case TEXT:
+        Binary[] binaryValues = (Binary[]) column;
+        for (int j = start; j < end; j++) {
+          size += ReadWriteIOUtils.sizeToWrite(binaryValues[j]);
+        }
+        break;
+    }
+    return size;
+  }
+
   @Override
   public void serializeToWAL(IWALByteBufferView buffer) {
     serializeToWAL(buffer, 0, rowCount);
@@ -526,23 +613,18 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
 
   void subSerialize(IWALByteBufferView buffer, int start, int end) {
     WALWriteUtils.write(devicePath.getFullPath(), buffer);
-    writeMeasurements(buffer);
-    writeDataTypes(buffer);
+    // data types are serialized in measurement schemas
+    writeMeasurementSchemas(buffer);
     writeTimes(buffer, start, end);
     writeBitMaps(buffer, start, end);
     writeValues(buffer, start, end);
     buffer.put((byte) (isAligned ? 1 : 0));
   }
 
-  private void writeMeasurements(IWALByteBufferView buffer) {
-    buffer.putInt(measurementSchemas.length);
-    serializeMeasurementSchemaToWAL(buffer);
-  }
-
-  private void writeDataTypes(IWALByteBufferView buffer) {
-    for (TSDataType dataType : dataTypes) {
-      WALWriteUtils.write(dataType, buffer);
-    }
+  /** Serialize measurement schemas, ignoring failed time series */
+  private void writeMeasurementSchemas(IWALByteBufferView buffer) {
+    buffer.putInt(measurements.length - getFailedMeasurementNumber());
+    serializeMeasurementSchemasToWAL(buffer);
   }
 
   private void writeTimes(IWALByteBufferView buffer, int start, int end) {
@@ -552,26 +634,36 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     }
   }
 
+  /** Serialize bitmaps, ignoring failed time series */
   private void writeBitMaps(IWALByteBufferView buffer, int start, int end) {
     buffer.put(BytesUtils.boolToByte(bitMaps != null));
     if (bitMaps != null) {
-      for (int i = 0; i < measurements.length; i++) {
-        BitMap bitMap = bitMaps[i];
-        if (bitMap == null) {
+      for (int i = 0; i < bitMaps.length; i++) {
+        // ignore failed partial insert
+        if (measurements[i] == null) {
+          continue;
+        }
+
+        if (bitMaps[i] == null) {
           buffer.put(BytesUtils.boolToByte(false));
         } else {
           buffer.put(BytesUtils.boolToByte(true));
           int len = end - start;
           BitMap partBitMap = new BitMap(len);
-          BitMap.copyOfRange(bitMap, start, partBitMap, 0, len);
+          BitMap.copyOfRange(bitMaps[i], start, partBitMap, 0, len);
           buffer.put(partBitMap.getByteArray());
         }
       }
     }
   }
 
+  /** Serialize values, ignoring failed time series */
   private void writeValues(IWALByteBufferView buffer, int start, int end) {
-    for (int i = 0; i < dataTypes.length; i++) {
+    for (int i = 0; i < columns.length; i++) {
+      // ignore failed partial insert
+      if (measurements[i] == null) {
+        continue;
+      }
       serializeColumn(dataTypes[i], columns[i], buffer, start, end);
     }
   }
@@ -621,94 +713,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     }
   }
 
-  private Object[] initTabletValues(int columnSize, int rowSize, TSDataType[] dataTypes) {
-    Object[] values = new Object[columnSize];
-    for (int i = 0; i < values.length; i++) {
-      switch (dataTypes[i]) {
-        case TEXT:
-          values[i] = new Binary[rowSize];
-          break;
-        case FLOAT:
-          values[i] = new float[rowSize];
-          break;
-        case INT32:
-          values[i] = new int[rowSize];
-          break;
-        case INT64:
-          values[i] = new long[rowSize];
-          break;
-        case DOUBLE:
-          values[i] = new double[rowSize];
-          break;
-        case BOOLEAN:
-          values[i] = new boolean[rowSize];
-          break;
-      }
-    }
-    return values;
-  }
-
-  private BitMap[] initBitmaps(int columnSize, int rowSize) {
-    BitMap[] bitMaps = new BitMap[columnSize];
-    for (int i = 0; i < columnSize; i++) {
-      bitMaps[i] = new BitMap(rowSize);
-    }
-    return bitMaps;
-  }
-
-  public static InsertTabletNode deserialize(ByteBuffer byteBuffer) {
-    InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
-    insertNode.subDeserialize(byteBuffer);
-    insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
-    return insertNode;
-  }
-
-  public void subDeserialize(ByteBuffer buffer) {
-    try {
-      this.devicePath = new PartialPath(ReadWriteIOUtils.readString(buffer));
-    } catch (IllegalPathException e) {
-      throw new IllegalArgumentException("Cannot deserialize InsertTabletNode", e);
-    }
-
-    int measurementSize = buffer.getInt();
-    this.measurements = new String[measurementSize];
-
-    boolean hasSchema = buffer.get() == 1;
-
-    if (hasSchema) {
-      this.measurementSchemas = new MeasurementSchema[measurementSize];
-      for (int i = 0; i < measurementSize; i++) {
-        measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
-        measurements[i] = measurementSchemas[i].getMeasurementId();
-      }
-    } else {
-      for (int i = 0; i < measurementSize; i++) {
-        measurements[i] = ReadWriteIOUtils.readString(buffer);
-      }
-    }
-
-    this.dataTypes = new TSDataType[measurementSize];
-    for (int i = 0; i < measurementSize; i++) {
-      dataTypes[i] = TSDataType.deserialize(buffer.get());
-    }
-
-    int rows = buffer.getInt();
-    rowCount = rows;
-    this.times = new long[rows];
-    times = QueryDataSetUtils.readTimesFromBuffer(buffer, rows);
-
-    boolean hasBitMaps = BytesUtils.byteToBool(buffer.get());
-    if (hasBitMaps) {
-      bitMaps = QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize, rows);
-    }
-    columns =
-        QueryDataSetUtils.readTabletValuesFromBuffer(buffer, dataTypes, measurementSize, rows);
-    this.isAligned = buffer.get() == 1;
-  }
-
+  /** Deserialize from wal */
   public static InsertTabletNode deserialize(DataInputStream stream)
       throws IllegalPathException, IOException {
-    // This method is used for deserialize from wal
     // we do not store plan node id in wal entry
     InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
     insertNode.subDeserialize(stream);
@@ -716,31 +723,32 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
   }
 
   private void subDeserialize(DataInputStream stream) throws IllegalPathException, IOException {
-    this.devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
+    devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
 
     int measurementSize = stream.readInt();
-    this.measurements = new String[measurementSize];
-    this.measurementSchemas = new MeasurementSchema[measurementSize];
-    deserializeMeasurementSchema(stream);
+    measurements = new String[measurementSize];
+    measurementSchemas = new MeasurementSchema[measurementSize];
+    deserializeMeasurementSchemas(stream);
 
-    this.dataTypes = new TSDataType[measurementSize];
+    // data types are serialized in measurement schemas
+    dataTypes = new TSDataType[measurementSize];
     for (int i = 0; i < measurementSize; i++) {
-      dataTypes[i] = TSDataType.deserialize(stream.readByte());
+      dataTypes[i] = measurementSchemas[i].getType();
     }
 
-    int rows = stream.readInt();
-    rowCount = rows;
-    this.times = new long[rows];
-    times = QueryDataSetUtils.readTimesFromStream(stream, rows);
+    rowCount = stream.readInt();
+    times = new long[rowCount];
+    times = QueryDataSetUtils.readTimesFromStream(stream, rowCount);
 
     boolean hasBitMaps = BytesUtils.byteToBool(stream.readByte());
     if (hasBitMaps) {
-      bitMaps = QueryDataSetUtils.readBitMapsFromStream(stream, measurementSize, rows);
+      bitMaps = QueryDataSetUtils.readBitMapsFromStream(stream, measurementSize, rowCount);
     }
     columns =
-        QueryDataSetUtils.readTabletValuesFromStream(stream, dataTypes, measurementSize, rows);
-    this.isAligned = stream.readByte() == 1;
+        QueryDataSetUtils.readTabletValuesFromStream(stream, dataTypes, measurementSize, rowCount);
+    isAligned = stream.readByte() == 1;
   }
+  // endregion
 
   @Override
   public int hashCode() {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index b3d145ca83..f8b5778611 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
@@ -106,8 +105,7 @@ public class InternalServiceImpl implements InternalService.Iface {
         PlanNode planNode = fragmentInstance.getFragment().getRoot();
         if (planNode instanceof InsertNode) {
           try {
-            SchemaTree schemaTree = SchemaValidator.validate((InsertNode) planNode);
-            ((InsertNode) planNode).setMeasurementSchemas(schemaTree);
+            SchemaValidator.validate((InsertNode) planNode);
           } catch (SemanticException e) {
             response.setAccepted(false);
             response.setMessage(e.getMessage());
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index 7551509bca..61be14f55e 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -62,12 +62,14 @@ public class MemUtils {
    */
   public static long getRecordsSize(
       List<TSDataType> dataTypes, Object[] value, boolean addingTextDataSize) {
+    int emptyRecordCount = 0;
     long memSize = 0L;
-    for (int i = 0; i < dataTypes.size(); i++) {
+    for (int i = 0; i < value.length; i++) {
       if (value[i] == null) {
+        emptyRecordCount++;
         continue;
       }
-      memSize += getRecordSize(dataTypes.get(i), value[i], addingTextDataSize);
+      memSize += getRecordSize(dataTypes.get(i - emptyRecordCount), value[i], addingTextDataSize);
     }
     return memSize;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
index 2fd1ae18b9..5eb016ced7 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
@@ -173,4 +173,21 @@ public class WALWriteUtils {
     }
     return len;
   }
+
+  public static int sizeToWrite(MeasurementSchema measurementSchema) {
+    int byteLen = 0;
+    byteLen += ReadWriteIOUtils.sizeToWrite(measurementSchema.getMeasurementId());
+    byteLen += 3 * Byte.BYTES;
+
+    Map<String, String> props = measurementSchema.getProps();
+    byteLen += Integer.BYTES;
+    if (props != null) {
+      for (Map.Entry<String, String> entry : props.entrySet()) {
+        byteLen += ReadWriteIOUtils.sizeToWrite(entry.getKey());
+        byteLen += ReadWriteIOUtils.sizeToWrite(entry.getValue());
+      }
+    }
+
+    return byteLen;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertMultiTabletsNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertMultiTabletsNodeSerdeTest.java
index 0fe2113d57..97b00cb218 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertMultiTabletsNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertMultiTabletsNodeSerdeTest.java
@@ -21,28 +21,22 @@ package org.apache.iotdb.db.mpp.plan.plan.node.write;
 
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
-import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class InsertMultiTabletsNodeSerdeTest {
 
   @Test
-  public void testInsertMultiTabletPlan()
-      throws QueryProcessException, MetadataException, InterruptedException,
-          QueryFilterOptimizationException, StorageEngineException, IOException {
+  public void testInsertMultiTabletPlan() throws MetadataException {
     long[] times = new long[] {110L, 111L, 112L, 113L};
     TSDataType[] dataTypes =
         new TSDataType[] {
@@ -96,6 +90,6 @@ public class InsertMultiTabletsNodeSerdeTest {
 
     Assert.assertEquals(PlanNodeType.INSERT_MULTI_TABLET.ordinal(), byteBuffer.getShort());
 
-    Assert.assertEquals(InsertMultiTabletsNode.deserialize(byteBuffer), insertMultiTabletsNode);
+    Assert.assertEquals(insertMultiTabletsNode, InsertMultiTabletsNode.deserialize(byteBuffer));
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java
index 0dd4ae51f3..12a8120013 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java
@@ -48,7 +48,7 @@ public class InsertRowNodeSerdeTest {
 
     Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), byteBuffer.getShort());
 
-    Assert.assertEquals(InsertRowNode.deserialize(byteBuffer), insertRowNode);
+    Assert.assertEquals(insertRowNode, InsertRowNode.deserialize(byteBuffer));
 
     insertRowNode = getInsertRowNodeWithMeasurementSchemas();
     byteBuffer = ByteBuffer.allocate(10000);
@@ -57,7 +57,7 @@ public class InsertRowNodeSerdeTest {
 
     Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), byteBuffer.getShort());
 
-    Assert.assertEquals(InsertRowNode.deserialize(byteBuffer), insertRowNode);
+    Assert.assertEquals(insertRowNode, InsertRowNode.deserialize(byteBuffer));
 
     insertRowNode = getInsertRowNodeWithStringValue();
     byteBuffer = ByteBuffer.allocate(10000);
@@ -66,7 +66,7 @@ public class InsertRowNodeSerdeTest {
 
     Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), byteBuffer.getShort());
 
-    Assert.assertEquals(InsertRowNode.deserialize(byteBuffer), insertRowNode);
+    Assert.assertEquals(insertRowNode, InsertRowNode.deserialize(byteBuffer));
   }
 
   @Test
@@ -75,25 +75,20 @@ public class InsertRowNodeSerdeTest {
 
     int serializedSize = insertRowNode.serializedSize();
 
-    Assert.assertEquals(serializedSize, 125);
-
     byte[] bytes = new byte[serializedSize];
     WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes));
 
     insertRowNode.serializeToWAL(walBuffer);
+    Assert.assertFalse(walBuffer.getBuffer().hasRemaining());
 
     DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));
 
     Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), dataInputStream.readShort());
 
     InsertRowNode tmpNode = InsertRowNode.deserialize(dataInputStream);
+    tmpNode.setPlanNodeId(insertRowNode.getPlanNodeId());
 
-    Assert.assertEquals(tmpNode.getTime(), insertRowNode.getTime());
-    Assert.assertEquals(tmpNode.getDevicePath(), insertRowNode.getDevicePath());
-    Assert.assertEquals(tmpNode.isAligned(), insertRowNode.isAligned());
-    Assert.assertArrayEquals(tmpNode.getValues(), insertRowNode.getValues());
-    Assert.assertArrayEquals(
-        tmpNode.getMeasurementSchemas(), insertRowNode.getMeasurementSchemas());
+    Assert.assertEquals(insertRowNode, tmpNode);
   }
 
   private InsertRowNode getInsertRowNode() throws IllegalPathException {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowsNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowsNodeSerdeTest.java
index 4c3744e2f2..1e3033367e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowsNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowsNodeSerdeTest.java
@@ -67,6 +67,6 @@ public class InsertRowsNodeSerdeTest {
 
     Assert.assertEquals(PlanNodeType.INSERT_ROWS.ordinal(), byteBuffer.getShort());
 
-    Assert.assertEquals(InsertRowsNode.deserialize(byteBuffer), node);
+    Assert.assertEquals(node, InsertRowsNode.deserialize(byteBuffer));
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java
index f657b7cc03..155f80c116 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java
@@ -76,6 +76,6 @@ public class InsertRowsOfOneDeviceNodeSerdeTest {
 
     Assert.assertEquals(PlanNodeType.INSERT_ROWS_OF_ONE_DEVICE.ordinal(), byteBuffer.getShort());
 
-    Assert.assertEquals(InsertRowsOfOneDeviceNode.deserialize(byteBuffer), node);
+    Assert.assertEquals(node, InsertRowsOfOneDeviceNode.deserialize(byteBuffer));
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
index 0dfeae490e..b1c73bc080 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
@@ -48,7 +48,7 @@ public class InsertTabletNodeSerdeTest {
 
     Assert.assertEquals(PlanNodeType.INSERT_TABLET.ordinal(), byteBuffer.getShort());
 
-    Assert.assertEquals(InsertTabletNode.deserialize(byteBuffer), insertTabletNode);
+    Assert.assertEquals(insertTabletNode, InsertTabletNode.deserialize(byteBuffer));
 
     insertTabletNode = getInsertTabletNodeWithSchema();
     byteBuffer = ByteBuffer.allocate(10000);
@@ -57,7 +57,7 @@ public class InsertTabletNodeSerdeTest {
 
     Assert.assertEquals(PlanNodeType.INSERT_TABLET.ordinal(), byteBuffer.getShort());
 
-    Assert.assertEquals(InsertTabletNode.deserialize(byteBuffer), insertTabletNode);
+    Assert.assertEquals(insertTabletNode, InsertTabletNode.deserialize(byteBuffer));
   }
 
   @Test
@@ -66,26 +66,20 @@ public class InsertTabletNodeSerdeTest {
 
     int serializedSize = insertTabletNode.serializedSize();
 
-    Assert.assertEquals(229, serializedSize);
-
     byte[] bytes = new byte[serializedSize];
     WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes));
 
     insertTabletNode.serializeToWAL(walBuffer);
+    Assert.assertFalse(walBuffer.getBuffer().hasRemaining());
 
     DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));
 
     Assert.assertEquals(PlanNodeType.INSERT_TABLET.ordinal(), dataInputStream.readShort());
 
     InsertTabletNode tmpNode = InsertTabletNode.deserialize(dataInputStream);
+    tmpNode.setPlanNodeId(insertTabletNode.getPlanNodeId());
 
-    Assert.assertArrayEquals(tmpNode.getTimes(), insertTabletNode.getTimes());
-    Assert.assertEquals(tmpNode.getDevicePath(), insertTabletNode.getDevicePath());
-    Assert.assertEquals(tmpNode.isAligned(), insertTabletNode.isAligned());
-    Assert.assertArrayEquals(tmpNode.getColumns(), insertTabletNode.getColumns());
-    Assert.assertArrayEquals(tmpNode.getBitMaps(), insertTabletNode.getBitMaps());
-    Assert.assertArrayEquals(
-        tmpNode.getMeasurementSchemas(), insertTabletNode.getMeasurementSchemas());
+    Assert.assertEquals(insertTabletNode, tmpNode);
   }
 
   private InsertTabletNode getInsertTabletNode() throws IllegalPathException {
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java b/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
index 670b8a7dcb..d774b36635 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.wal.io;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
@@ -30,6 +33,7 @@ import org.apache.iotdb.db.wal.utils.WALByteBufferForTest;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.junit.After;
 import org.junit.Before;
@@ -66,6 +70,8 @@ public class WALFileTest {
   public void testReadNormalFile() throws IOException, IllegalPathException {
     int fakeMemTableId = 1;
     List<WALEntry> expectedWALEntries = new ArrayList<>();
+    expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertRowNode(devicePath)));
+    expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertTabletNode(devicePath)));
     expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertRowPlan(devicePath)));
     expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertTabletPlan(devicePath)));
     expectedWALEntries.add(new WALEntry(fakeMemTableId, getDeletePlan(devicePath)));
@@ -111,6 +117,8 @@ public class WALFileTest {
   public void testReadBrokenFile() throws IOException, IllegalPathException {
     int fakeMemTableId = 1;
     List<WALEntry> expectedWALEntries = new ArrayList<>();
+    expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertRowNode(devicePath)));
+    expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertTabletNode(devicePath)));
     expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertRowPlan(devicePath)));
     expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertTabletPlan(devicePath)));
     expectedWALEntries.add(new WALEntry(fakeMemTableId, getDeletePlan(devicePath)));
@@ -217,6 +225,112 @@ public class WALFileTest {
     return insertTabletPlan;
   }
 
+  public static InsertRowNode getInsertRowNode(String devicePath) throws IllegalPathException {
+    long time = 110L;
+    TSDataType[] dataTypes =
+        new TSDataType[] {
+          TSDataType.DOUBLE,
+          TSDataType.FLOAT,
+          TSDataType.INT64,
+          TSDataType.INT32,
+          TSDataType.BOOLEAN,
+          TSDataType.TEXT
+        };
+
+    Object[] columns = new Object[6];
+    columns[0] = 1.0;
+    columns[1] = 2.0f;
+    columns[2] = 10000L;
+    columns[3] = 100;
+    columns[4] = false;
+    columns[5] = new Binary("hh" + 0);
+
+    InsertRowNode insertRowNode =
+        new InsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath(devicePath),
+            false,
+            new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+            dataTypes,
+            time,
+            columns,
+            false);
+
+    insertRowNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.DOUBLE),
+          new MeasurementSchema("s2", TSDataType.FLOAT),
+          new MeasurementSchema("s3", TSDataType.INT64),
+          new MeasurementSchema("s4", TSDataType.INT32),
+          new MeasurementSchema("s5", TSDataType.BOOLEAN),
+          new MeasurementSchema("s6", TSDataType.TEXT)
+        });
+    return insertRowNode;
+  }
+
+  public static InsertTabletNode getInsertTabletNode(String devicePath)
+      throws IllegalPathException {
+    long[] times = new long[] {110L, 111L, 112L, 113L};
+    TSDataType[] dataTypes =
+        new TSDataType[] {
+          TSDataType.DOUBLE,
+          TSDataType.FLOAT,
+          TSDataType.INT64,
+          TSDataType.INT32,
+          TSDataType.BOOLEAN,
+          TSDataType.TEXT
+        };
+
+    Object[] columns = new Object[6];
+    columns[0] = new double[4];
+    columns[1] = new float[4];
+    columns[2] = new long[4];
+    columns[3] = new int[4];
+    columns[4] = new boolean[4];
+    columns[5] = new Binary[4];
+
+    for (int r = 0; r < 4; r++) {
+      ((double[]) columns[0])[r] = 1.0 + r;
+      ((float[]) columns[1])[r] = 2 + r;
+      ((long[]) columns[2])[r] = 10000 + r;
+      ((int[]) columns[3])[r] = 100 + r;
+      ((boolean[]) columns[4])[r] = (r % 2 == 0);
+      ((Binary[]) columns[5])[r] = new Binary("hh" + r);
+    }
+
+    BitMap[] bitMaps = new BitMap[dataTypes.length];
+    for (int i = 0; i < dataTypes.length; i++) {
+      if (bitMaps[i] == null) {
+        bitMaps[i] = new BitMap(times.length);
+      }
+      bitMaps[i].mark(i % times.length);
+    }
+
+    InsertTabletNode insertTabletNode =
+        new InsertTabletNode(
+            new PlanNodeId(""),
+            new PartialPath(devicePath),
+            false,
+            new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+            dataTypes,
+            times,
+            bitMaps,
+            columns,
+            times.length);
+
+    insertTabletNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.DOUBLE),
+          new MeasurementSchema("s2", TSDataType.FLOAT),
+          new MeasurementSchema("s3", TSDataType.INT64),
+          new MeasurementSchema("s4", TSDataType.INT32),
+          new MeasurementSchema("s5", TSDataType.BOOLEAN),
+          new MeasurementSchema("s6", TSDataType.TEXT)
+        });
+
+    return insertTabletNode;
+  }
+
   public static DeletePlan getDeletePlan(String devicePath) throws IllegalPathException {
     return new DeletePlan(Long.MIN_VALUE, Long.MAX_VALUE, new PartialPath(devicePath));
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitMap.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitMap.java
index 97ed0b7397..747ea23323 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitMap.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitMap.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.tsfile.utils;
 
 import java.util.Arrays;
+import java.util.Objects;
 
 public class BitMap {
   private static final byte[] BIT_UTIL = new byte[] {1, 2, 4, 8, 16, 32, 64, -128};
@@ -124,6 +125,28 @@ public class BitMap {
     return res.toString();
   }
 
+  @Override
+  public int hashCode() {
+    int result = Objects.hash(size);
+    result = 31 * result + Arrays.hashCode(bits);
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof BitMap)) {
+      return false;
+    }
+    BitMap other = (BitMap) obj;
+    return this.size == other.size && Arrays.equals(this.bits, other.bits);
+  }
+
   @Override
   public BitMap clone() {
     byte[] cloneBytes = new byte[this.bits.length];