You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/22 09:18:05 UTC

[iotdb] branch master updated: Serialize measurement schema of insert node to wal entry (#5638)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a547fef5ca Serialize measurement schema of insert node to wal entry (#5638)
a547fef5ca is described below

commit a547fef5ca846fcde5d374d4b4857ed357ebcfcd
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Fri Apr 22 17:18:01 2022 +0800

    Serialize measurement schema of insert node to wal entry (#5638)
---
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  | 328 ++++++++++-----------
 .../db/mpp/sql/planner/plan/node/PlanNodeId.java   |  16 -
 .../sql/planner/plan/node/write/InsertNode.java    |  81 +++++
 .../sql/planner/plan/node/write/InsertRowNode.java |  34 +--
 .../planner/plan/node/write/InsertTabletNode.java  |  34 +--
 .../apache/iotdb/db/wal/utils/WALWriteUtils.java   |  12 +
 6 files changed, 283 insertions(+), 222 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index 7bfec1905e..56c587ad85 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -325,6 +325,170 @@ public class Analyzer {
       return analysis;
     }
 
+    @Override
+    public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
+      // TODO remove duplicate
+      SchemaTree schemaTree =
+          schemaFetcher.fetchSchemaWithAutoCreate(
+              insertRowStatement.getDevicePath(),
+              insertRowStatement.getMeasurements(),
+              insertRowStatement.getDataTypes(),
+              insertRowStatement.isAligned());
+
+      try {
+        insertRowStatement.transferType(schemaTree);
+      } catch (QueryProcessException e) {
+        throw new SemanticException(e.getMessage());
+      }
+
+      if (!insertRowStatement.checkDataType(schemaTree)) {
+        throw new SemanticException("Data type mismatch");
+      }
+
+      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
+      DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+      dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
+      dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots());
+      sgNameToQueryParamsMap.put(
+          schemaTree.getBelongedStorageGroup(insertRowStatement.getDevicePath()),
+          Collections.singletonList(dataPartitionQueryParam));
+      DataPartition dataPartition =
+          partitionFetcher.getOrCreateDataPartition(sgNameToQueryParamsMap);
+
+      Analysis analysis = new Analysis();
+      analysis.setSchemaTree(schemaTree);
+      analysis.setStatement(insertRowStatement);
+      analysis.setDataPartitionInfo(dataPartition);
+
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitInsertRows(
+        InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
+      // TODO remove duplicate
+      SchemaTree schemaTree =
+          schemaFetcher.fetchSchemaListWithAutoCreate(
+              insertRowsStatement.getDevicePaths(),
+              insertRowsStatement.getMeasurementsList(),
+              insertRowsStatement.getDataTypesList(),
+              insertRowsStatement.getAlignedList());
+
+      try {
+        insertRowsStatement.transferType(schemaTree);
+      } catch (QueryProcessException e) {
+        throw new SemanticException(e.getMessage());
+      }
+
+      if (!insertRowsStatement.checkDataType(schemaTree)) {
+        throw new SemanticException("Data type mismatch");
+      }
+
+      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
+      for (InsertRowStatement insertRowStatement :
+          insertRowsStatement.getInsertRowStatementList()) {
+        DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+        dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
+        dataPartitionQueryParam.setTimePartitionSlotList(
+            insertRowStatement.getTimePartitionSlots());
+        sgNameToQueryParamsMap
+            .computeIfAbsent(
+                schemaTree.getBelongedStorageGroup(insertRowStatement.getDevicePath()),
+                key -> new ArrayList<>())
+            .add(dataPartitionQueryParam);
+      }
+      DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+
+      Analysis analysis = new Analysis();
+      analysis.setSchemaTree(schemaTree);
+      analysis.setStatement(insertRowsStatement);
+      analysis.setDataPartitionInfo(dataPartition);
+
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitInsertMultiTablets(
+        InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
+      // TODO remove duplicate
+      SchemaTree schemaTree =
+          schemaFetcher.fetchSchemaListWithAutoCreate(
+              insertMultiTabletsStatement.getDevicePaths(),
+              insertMultiTabletsStatement.getMeasurementsList(),
+              insertMultiTabletsStatement.getDataTypesList(),
+              insertMultiTabletsStatement.getAlignedList());
+
+      if (!insertMultiTabletsStatement.checkDataType(schemaTree)) {
+        throw new SemanticException("Data type mismatch");
+      }
+
+      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
+      for (InsertTabletStatement insertTabletStatement :
+          insertMultiTabletsStatement.getInsertTabletStatementList()) {
+        DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+        dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
+        dataPartitionQueryParam.setTimePartitionSlotList(
+            insertTabletStatement.getTimePartitionSlots());
+        sgNameToQueryParamsMap
+            .computeIfAbsent(
+                schemaTree.getBelongedStorageGroup(insertTabletStatement.getDevicePath()),
+                key -> new ArrayList<>())
+            .add(dataPartitionQueryParam);
+      }
+      DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+
+      Analysis analysis = new Analysis();
+      analysis.setSchemaTree(schemaTree);
+      analysis.setStatement(insertMultiTabletsStatement);
+      analysis.setDataPartitionInfo(dataPartition);
+
+      return analysis;
+    }
+
+    @Override
+    public Analysis visitInsertRowsOfOneDevice(
+        InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
+      context.setQueryType(QueryType.WRITE);
+      // TODO remove duplicate
+      SchemaTree schemaTree =
+          schemaFetcher.fetchSchemaWithAutoCreate(
+              insertRowsOfOneDeviceStatement.getDevicePath(),
+              insertRowsOfOneDeviceStatement.getMeasurements(),
+              insertRowsOfOneDeviceStatement.getDataTypes(),
+              insertRowsOfOneDeviceStatement.isAligned());
+
+      try {
+        insertRowsOfOneDeviceStatement.transferType(schemaTree);
+      } catch (QueryProcessException e) {
+        throw new SemanticException(e.getMessage());
+      }
+
+      if (!insertRowsOfOneDeviceStatement.checkDataType(schemaTree)) {
+        throw new SemanticException("Data type mismatch");
+      }
+
+      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
+      DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+      dataPartitionQueryParam.setDevicePath(
+          insertRowsOfOneDeviceStatement.getDevicePath().getFullPath());
+      dataPartitionQueryParam.setTimePartitionSlotList(
+          insertRowsOfOneDeviceStatement.getTimePartitionSlots());
+      sgNameToQueryParamsMap.put(
+          schemaTree.getBelongedStorageGroup(insertRowsOfOneDeviceStatement.getDevicePath()),
+          Collections.singletonList(dataPartitionQueryParam));
+      DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+
+      Analysis analysis = new Analysis();
+      analysis.setSchemaTree(schemaTree);
+      analysis.setStatement(insertRowsOfOneDeviceStatement);
+      analysis.setDataPartitionInfo(dataPartition);
+
+      return analysis;
+    }
+
     @Override
     public Analysis visitShowTimeSeries(
         ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) {
@@ -517,170 +681,6 @@ public class Analyzer {
       return analysis;
     }
 
-    @Override
-    public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
-      context.setQueryType(QueryType.WRITE);
-      // TODO remove duplicate
-      SchemaTree schemaTree =
-          schemaFetcher.fetchSchemaWithAutoCreate(
-              insertRowStatement.getDevicePath(),
-              insertRowStatement.getMeasurements(),
-              insertRowStatement.getDataTypes(),
-              insertRowStatement.isAligned());
-
-      try {
-        insertRowStatement.transferType(schemaTree);
-      } catch (QueryProcessException e) {
-        throw new SemanticException(e.getMessage());
-      }
-
-      if (!insertRowStatement.checkDataType(schemaTree)) {
-        throw new SemanticException("Data type mismatch");
-      }
-
-      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
-      DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
-      dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
-      dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots());
-      sgNameToQueryParamsMap.put(
-          schemaTree.getBelongedStorageGroup(insertRowStatement.getDevicePath()),
-          Collections.singletonList(dataPartitionQueryParam));
-      DataPartition dataPartition =
-          partitionFetcher.getOrCreateDataPartition(sgNameToQueryParamsMap);
-
-      Analysis analysis = new Analysis();
-      analysis.setSchemaTree(schemaTree);
-      analysis.setStatement(insertRowStatement);
-      analysis.setDataPartitionInfo(dataPartition);
-
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitInsertRows(
-        InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
-      context.setQueryType(QueryType.WRITE);
-      // TODO remove duplicate
-      SchemaTree schemaTree =
-          schemaFetcher.fetchSchemaListWithAutoCreate(
-              insertRowsStatement.getDevicePaths(),
-              insertRowsStatement.getMeasurementsList(),
-              insertRowsStatement.getDataTypesList(),
-              insertRowsStatement.getAlignedList());
-
-      try {
-        insertRowsStatement.transferType(schemaTree);
-      } catch (QueryProcessException e) {
-        throw new SemanticException(e.getMessage());
-      }
-
-      if (!insertRowsStatement.checkDataType(schemaTree)) {
-        throw new SemanticException("Data type mismatch");
-      }
-
-      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
-      for (InsertRowStatement insertRowStatement :
-          insertRowsStatement.getInsertRowStatementList()) {
-        DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
-        dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
-        dataPartitionQueryParam.setTimePartitionSlotList(
-            insertRowStatement.getTimePartitionSlots());
-        sgNameToQueryParamsMap
-            .computeIfAbsent(
-                schemaTree.getBelongedStorageGroup(insertRowStatement.getDevicePath()),
-                key -> new ArrayList<>())
-            .add(dataPartitionQueryParam);
-      }
-      DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
-
-      Analysis analysis = new Analysis();
-      analysis.setSchemaTree(schemaTree);
-      analysis.setStatement(insertRowsStatement);
-      analysis.setDataPartitionInfo(dataPartition);
-
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitInsertMultiTablets(
-        InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
-      context.setQueryType(QueryType.WRITE);
-      // TODO remove duplicate
-      SchemaTree schemaTree =
-          schemaFetcher.fetchSchemaListWithAutoCreate(
-              insertMultiTabletsStatement.getDevicePaths(),
-              insertMultiTabletsStatement.getMeasurementsList(),
-              insertMultiTabletsStatement.getDataTypesList(),
-              insertMultiTabletsStatement.getAlignedList());
-
-      if (!insertMultiTabletsStatement.checkDataType(schemaTree)) {
-        throw new SemanticException("Data type mismatch");
-      }
-
-      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
-      for (InsertTabletStatement insertTabletStatement :
-          insertMultiTabletsStatement.getInsertTabletStatementList()) {
-        DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
-        dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
-        dataPartitionQueryParam.setTimePartitionSlotList(
-            insertTabletStatement.getTimePartitionSlots());
-        sgNameToQueryParamsMap
-            .computeIfAbsent(
-                schemaTree.getBelongedStorageGroup(insertTabletStatement.getDevicePath()),
-                key -> new ArrayList<>())
-            .add(dataPartitionQueryParam);
-      }
-      DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
-
-      Analysis analysis = new Analysis();
-      analysis.setSchemaTree(schemaTree);
-      analysis.setStatement(insertMultiTabletsStatement);
-      analysis.setDataPartitionInfo(dataPartition);
-
-      return analysis;
-    }
-
-    @Override
-    public Analysis visitInsertRowsOfOneDevice(
-        InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
-      context.setQueryType(QueryType.WRITE);
-      // TODO remove duplicate
-      SchemaTree schemaTree =
-          schemaFetcher.fetchSchemaWithAutoCreate(
-              insertRowsOfOneDeviceStatement.getDevicePath(),
-              insertRowsOfOneDeviceStatement.getMeasurements(),
-              insertRowsOfOneDeviceStatement.getDataTypes(),
-              insertRowsOfOneDeviceStatement.isAligned());
-
-      try {
-        insertRowsOfOneDeviceStatement.transferType(schemaTree);
-      } catch (QueryProcessException e) {
-        throw new SemanticException(e.getMessage());
-      }
-
-      if (!insertRowsOfOneDeviceStatement.checkDataType(schemaTree)) {
-        throw new SemanticException("Data type mismatch");
-      }
-
-      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
-      DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
-      dataPartitionQueryParam.setDevicePath(
-          insertRowsOfOneDeviceStatement.getDevicePath().getFullPath());
-      dataPartitionQueryParam.setTimePartitionSlotList(
-          insertRowsOfOneDeviceStatement.getTimePartitionSlots());
-      sgNameToQueryParamsMap.put(
-          schemaTree.getBelongedStorageGroup(insertRowsOfOneDeviceStatement.getDevicePath()),
-          Collections.singletonList(dataPartitionQueryParam));
-      DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
-
-      Analysis analysis = new Analysis();
-      analysis.setSchemaTree(schemaTree);
-      analysis.setStatement(insertRowsOfOneDeviceStatement);
-      analysis.setDataPartitionInfo(dataPartition);
-
-      return analysis;
-    }
-
     @Override
     public Analysis visitSchemaFetch(
         SchemaFetchStatement schemaFetchStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
index f0e7533e03..58519e303d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
@@ -18,12 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
-import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
-import org.apache.iotdb.db.wal.utils.WALWriteUtils;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import java.io.DataInputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class PlanNodeId {
@@ -62,16 +58,4 @@ public class PlanNodeId {
   public void serialize(ByteBuffer byteBuffer) {
     ReadWriteIOUtils.write(id, byteBuffer);
   }
-
-  public int serializedSize() {
-    return ReadWriteIOUtils.sizeToWrite(id);
-  }
-
-  public void serializeToWAL(IWALByteBufferView buffer) {
-    WALWriteUtils.write(id, buffer);
-  }
-
-  public static PlanNodeId deserialize(DataInputStream stream) throws IOException {
-    return new PlanNodeId(ReadWriteIOUtils.readString(stream));
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index 62cbffb55b..75de32d2c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -23,12 +23,21 @@ import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.wal.utils.WALWriteUtils;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import java.io.DataInputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 
 public abstract class InsertNode extends WritePlanNode {
@@ -131,6 +140,78 @@ public abstract class InsertNode extends WritePlanNode {
     this.deviceID = deviceID;
   }
 
+  public void serializeMeasurementSchemaToWAL(IWALByteBufferView buffer) {
+    for (MeasurementSchema measurementSchema : measurementSchemas) {
+      if (measurementSchema != null) {
+        WALWriteUtils.write(measurementSchema.getMeasurementId(), buffer);
+
+        WALWriteUtils.write(measurementSchema.getType(), buffer);
+
+        WALWriteUtils.write(measurementSchema.getEncodingType(), buffer);
+
+        WALWriteUtils.write(measurementSchema.getCompressor(), buffer);
+
+        Map<String, String> props = measurementSchema.getProps();
+        if (props == null) {
+          WALWriteUtils.write(0, buffer);
+        } else {
+          WALWriteUtils.write(props.size(), buffer);
+          for (Map.Entry<String, String> entry : props.entrySet()) {
+            WALWriteUtils.write(entry.getKey(), buffer);
+            WALWriteUtils.write(entry.getValue(), buffer);
+          }
+        }
+      }
+    }
+  }
+
+  public int serializeMeasurementSchemaSize() {
+    int byteLen = 0;
+    for (MeasurementSchema measurementSchema : measurementSchemas) {
+      if (measurementSchema != null) {
+        byteLen += ReadWriteIOUtils.sizeToWrite(measurementSchema.getMeasurementId());
+        byteLen += 3 * Byte.BYTES;
+        Map<String, String> props = measurementSchema.getProps();
+        if (props == null) {
+          byteLen += Integer.BYTES;
+        } else {
+          byteLen += Integer.BYTES;
+          for (Map.Entry<String, String> entry : props.entrySet()) {
+            byteLen += ReadWriteIOUtils.sizeToWrite(entry.getKey());
+            byteLen += ReadWriteIOUtils.sizeToWrite(entry.getValue());
+          }
+        }
+      }
+    }
+    return byteLen;
+  }
+
+  /** Make sure the measurement schema is already inited before calling this */
+  public void deserializeMeasurementSchema(DataInputStream stream) throws IOException {
+    for (int i = 0; i < measurementSchemas.length; i++) {
+
+      measurementSchemas[i] =
+          new MeasurementSchema(
+              ReadWriteIOUtils.readString(stream),
+              TSDataType.deserialize(ReadWriteIOUtils.readByte(stream)),
+              TSEncoding.deserialize(ReadWriteIOUtils.readByte(stream)),
+              CompressionType.deserialize(ReadWriteIOUtils.readByte(stream)));
+
+      int size = ReadWriteIOUtils.readInt(stream);
+      if (size > 0) {
+        Map<String, String> props = new HashMap<>();
+        String key;
+        String value;
+        for (int j = 0; j < size; j++) {
+          key = ReadWriteIOUtils.readString(stream);
+          value = ReadWriteIOUtils.readString(stream);
+          props.put(key, value);
+        }
+        measurementSchemas[i].setProps(props);
+      }
+    }
+  }
+
   public TRegionReplicaSet getRegionReplicaSet() {
     return dataRegionReplicaSet;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index cf144c9d51..eb25abadfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -106,7 +106,6 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   public int serializedSize() {
     int size = 0;
     size += Short.BYTES;
-    size += this.getPlanNodeId().serializedSize();
     return size + subSerializeSize();
   }
 
@@ -120,11 +119,8 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   int serializeMeasurementsAndValuesSize() {
     int size = 0;
     size += Integer.BYTES;
-    for (String m : measurements) {
-      if (m != null) {
-        size += ReadWriteIOUtils.sizeToWrite(m);
-      }
-    }
+
+    size += serializeMeasurementSchemaSize();
 
     // putValues
     for (int i = 0; i < values.length; i++) {
@@ -176,7 +172,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
 
   public static InsertRowNode deserialize(ByteBuffer byteBuffer) {
     // TODO: (xingtanzjr) remove placeholder
-    InsertRowNode insertNode = new InsertRowNode(new PlanNodeId("1"));
+    InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
     insertNode.setTime(byteBuffer.getLong());
     try {
       insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(byteBuffer)));
@@ -197,9 +193,9 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   void serializeMeasurementsAndValues(ByteBuffer buffer) {
     buffer.putInt(measurementSchemas.length - countFailedMeasurements());
 
-    for (MeasurementSchema measurement : measurementSchemas) {
-      if (measurement != null) {
-        measurement.serializeTo(buffer);
+    for (MeasurementSchema measurementSchema : measurementSchemas) {
+      if (measurementSchema != null) {
+        measurementSchema.serializeTo(buffer);
       }
     }
 
@@ -249,7 +245,6 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   @Override
   public void serializeToWAL(IWALByteBufferView buffer) {
     buffer.putShort((short) PlanNodeType.INSERT_ROW.ordinal());
-    getPlanNodeId().serializeToWAL(buffer);
     subSerialize(buffer);
   }
 
@@ -262,11 +257,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   void serializeMeasurementsAndValues(IWALByteBufferView buffer) {
     buffer.putInt(measurementSchemas.length - countFailedMeasurements());
 
-    for (String measurement : measurements) {
-      if (measurement != null) {
-        WALWriteUtils.write(measurement, buffer);
-      }
-    }
+    serializeMeasurementSchemaToWAL(buffer);
 
     try {
       putValues(buffer);
@@ -278,6 +269,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   }
 
   private void putValues(IWALByteBufferView buffer) throws QueryProcessException {
+    // todo remove serialize datatype after serializing measurement schema
     for (int i = 0; i < values.length; i++) {
       if (dataTypes[i] != null) {
         if (values[i] == null) {
@@ -383,7 +375,9 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
 
   public static InsertRowNode deserialize(DataInputStream stream)
       throws IOException, IllegalPathException {
-    InsertRowNode insertNode = new InsertRowNode(PlanNodeId.deserialize(stream));
+    // This method is used for deserialize from wal
+    // we do not store plan node id in wal entry
+    InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
     insertNode.setTime(stream.readLong());
     insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(stream)));
     insertNode.deserializeMeasurementsAndValues(stream);
@@ -394,10 +388,8 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   void deserializeMeasurementsAndValues(DataInputStream stream) throws IOException {
     int measurementSize = stream.readInt();
 
-    this.measurements = new String[measurementSize];
-    for (int i = 0; i < measurementSize; i++) {
-      measurements[i] = ReadWriteIOUtils.readString(stream);
-    }
+    this.measurementSchemas = new MeasurementSchema[measurementSize];
+    deserializeMeasurementSchema(stream);
 
     this.dataTypes = new TSDataType[measurementSize];
     this.values = new Object[measurementSize];
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index abf9ed08b0..e03cfcaab1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -161,7 +161,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
   public int serializedSize(int start, int end) {
     int size = 0;
     size += Short.BYTES;
-    size += this.getPlanNodeId().serializedSize();
     return size + subSerializeSize(start, end);
   }
 
@@ -170,11 +169,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
     // measurements size
     size += Integer.BYTES;
-    for (String m : measurements) {
-      if (m != null) {
-        size += ReadWriteIOUtils.sizeToWrite(m);
-      }
-    }
+
+    size += serializeMeasurementSchemaSize();
+
     // data types size
     size += Integer.BYTES;
     for (int i = 0; i < dataTypes.length; i++) {
@@ -238,9 +235,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
   }
 
   @Override
-  public void serialize(ByteBuffer byteBuffer) {
-    byteBuffer.putShort((short) PlanNodeType.INSERT_TABLET.ordinal());
-    getPlanNodeId().serialize(byteBuffer);
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.INSERT_TABLET.serialize(byteBuffer);
     subSerialize(byteBuffer);
   }
 
@@ -357,7 +353,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
 
   public void serializeToWAL(IWALByteBufferView buffer, int start, int end) {
     buffer.putShort((short) PlanNodeType.INSERT_TABLET.ordinal());
-    getPlanNodeId().serializeToWAL(buffer);
     subSerialize(buffer, start, end);
   }
 
@@ -373,11 +368,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
 
   private void writeMeasurements(IWALByteBufferView buffer) {
     buffer.putInt(measurementSchemas.length - countFailedMeasurements());
-    for (String m : measurements) {
-      if (m != null) {
-        WALWriteUtils.write(m, buffer);
-      }
-    }
+    serializeMeasurementSchemaToWAL(buffer);
   }
 
   @Override
@@ -679,12 +670,13 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
   }
 
   public static InsertTabletNode deserialize(ByteBuffer byteBuffer) {
-    InsertTabletNode insertNode = new InsertTabletNode(PlanNodeId.deserialize(byteBuffer));
+    InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
     try {
       insertNode.subDeserialize(byteBuffer);
     } catch (IllegalPathException e) {
       throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
     }
+    insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
     return insertNode;
   }
 
@@ -720,7 +712,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
 
   public static InsertTabletNode deserialize(DataInputStream stream)
       throws IllegalPathException, IOException {
-    InsertTabletNode insertNode = new InsertTabletNode(PlanNodeId.deserialize(stream));
+    // This method is used for deserialize from wal
+    // we do not store plan node id in wal entry
+    InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
     insertNode.subDeserialize(stream);
     return insertNode;
   }
@@ -729,10 +723,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     this.devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
 
     int measurementSize = stream.readInt();
-    this.measurements = new String[measurementSize];
-    for (int i = 0; i < measurementSize; i++) {
-      measurements[i] = ReadWriteIOUtils.readString(stream);
-    }
+    this.measurementSchemas = new MeasurementSchema[measurementSize];
+    deserializeMeasurementSchema(stream);
 
     this.dataTypes = new TSDataType[measurementSize];
     for (int i = 0; i < measurementSize; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
index b0e6ddbacd..314af7074b 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
@@ -19,7 +19,9 @@
 package org.apache.iotdb.db.wal.utils;
 
 import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -131,4 +133,14 @@ public class WALWriteUtils {
     byte n = dataType.serialize();
     return write(n, buffer);
   }
+
+  public static int write(TSEncoding encoding, IWALByteBufferView buffer) {
+    byte n = encoding.serialize();
+    return write(n, buffer);
+  }
+
+  public static int write(CompressionType compressionType, IWALByteBufferView buffer) {
+    byte n = compressionType.serialize();
+    return write(n, buffer);
+  }
 }